remoteMachineTrainingService.ts 29.7 KB
Newer Older
liuzhe-lz's avatar
liuzhe-lz committed
1
2
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.
Deshui Yu's avatar
Deshui Yu committed
3
4
5

'use strict';

6
import * as assert from 'assert';
Deshui Yu's avatar
Deshui Yu committed
7
8
9
10
11
import { EventEmitter } from 'events';
import * as fs from 'fs';
import * as path from 'path';
import { Deferred } from 'ts-deferred';
import * as component from '../../common/component';
SparkSnail's avatar
SparkSnail committed
12
import { NNIError, NNIErrorNames } from '../../common/errors';
13
import { getExperimentId } from '../../common/experimentStartupInfo';
Deshui Yu's avatar
Deshui Yu committed
14
15
16
import { getLogger, Logger } from '../../common/log';
import { ObservableTimer } from '../../common/observableTimer';
import {
17
    HyperParameters, NNIManagerIpConfig, TrainingService, TrialJobApplicationForm,
18
    TrialJobDetail, TrialJobMetric
Deshui Yu's avatar
Deshui Yu committed
19
} from '../../common/trainingService';
20
import {
21
22
    delay, generateParamFileName, getExperimentRootDir, getIPV4Address, getJobCancelStatus,
    getVersion, uniqueString
23
24
} from '../../common/utils';
import { CONTAINER_INSTALL_NNI_SHELL_FORMAT } from '../common/containerJobData';
25
import { GPUSummary } from '../common/gpuData';
26
27
import { TrialConfig } from '../common/trialConfig';
import { TrialConfigMetadataKey } from '../common/trialConfigMetadataKey';
28
import { execMkdir, validateCodeDir } from '../common/util';
Deshui Yu's avatar
Deshui Yu committed
29
30
import { GPUScheduler } from './gpuScheduler';
import {
31
    RemoteMachineMeta,
32
    RemoteMachineScheduleInfo, RemoteMachineScheduleResult, RemoteMachineTrialJobDetail,
33
    ScheduleResultType, ExecutorManager
Deshui Yu's avatar
Deshui Yu committed
34
} from './remoteMachineData';
SparkSnail's avatar
SparkSnail committed
35
import { RemoteMachineJobRestServer } from './remoteMachineJobRestServer';
36
import { ShellExecutor } from 'training_service/remote_machine/shellExecutor';
Deshui Yu's avatar
Deshui Yu committed
37
38
39
40

/**
 * Training Service implementation for Remote Machine (Linux)
 */
SparkSnail's avatar
SparkSnail committed
41
@component.Singleton
Deshui Yu's avatar
Deshui Yu committed
42
class RemoteMachineTrainingService implements TrainingService {
43
    private readonly initExecutorId = "initConnection";
44
    private readonly machineExecutorManagerMap: Map<RemoteMachineMeta, ExecutorManager>; //machine excutor map
45
    private readonly machineCopyExpCodeDirPromiseMap: Map<RemoteMachineMeta, Promise<void>>;
46
    private readonly trialExecutorManagerMap: Map<string, ExecutorManager>; //trial excutor map
47
48
    private readonly trialJobsMap: Map<string, RemoteMachineTrialJobDetail>;
    private readonly expRootDir: string;
49
    private trialConfig: TrialConfig | undefined;
50
    private gpuScheduler?: GPUScheduler;
51
52
    private readonly jobQueue: string[];
    private readonly timer: ObservableTimer;
Deshui Yu's avatar
Deshui Yu committed
53
    private stopping: boolean = false;
54
55
    private readonly metricsEmitter: EventEmitter;
    private readonly log: Logger;
56
    private isMultiPhase: boolean = false;
SparkSnail's avatar
SparkSnail committed
57
58
    private remoteRestServerPort?: number;
    private nniManagerIpConfig?: NNIManagerIpConfig;
59
    private versionCheck: boolean = true;
SparkSnail's avatar
SparkSnail committed
60
    private logCollection: string;
Deshui Yu's avatar
Deshui Yu committed
61
62
63
64

    constructor(@component.Inject timer: ObservableTimer) {
        this.metricsEmitter = new EventEmitter();
        this.trialJobsMap = new Map<string, RemoteMachineTrialJobDetail>();
65
        this.trialExecutorManagerMap = new Map<string, ExecutorManager>();
66
        this.machineCopyExpCodeDirPromiseMap = new Map<RemoteMachineMeta, Promise<void>>();
67
        this.machineExecutorManagerMap = new Map<RemoteMachineMeta, ExecutorManager>();
Deshui Yu's avatar
Deshui Yu committed
68
69
70
71
        this.jobQueue = [];
        this.expRootDir = getExperimentRootDir();
        this.timer = timer;
        this.log = getLogger();
SparkSnail's avatar
SparkSnail committed
72
        this.logCollection = 'none';
chicm-ms's avatar
chicm-ms committed
73
        this.log.info('Construct remote machine training service.');
Deshui Yu's avatar
Deshui Yu committed
74
75
76
77
78
79
    }

    /**
     * Loop to launch trial jobs and collect trial metrics
     */
    public async run(): Promise<void> {
SparkSnail's avatar
SparkSnail committed
80
81
        const restServer: RemoteMachineJobRestServer = component.get(RemoteMachineJobRestServer);
        await restServer.start();
82
        restServer.setEnableVersionCheck = this.versionCheck;
chicm-ms's avatar
chicm-ms committed
83
        this.log.info('Run remote machine training service.');
Deshui Yu's avatar
Deshui Yu committed
84
85
        while (!this.stopping) {
            while (this.jobQueue.length > 0) {
SparkSnail's avatar
SparkSnail committed
86
                this.updateGpuReservation();
Deshui Yu's avatar
Deshui Yu committed
87
                const trialJobId: string = this.jobQueue[0];
chicm-ms's avatar
chicm-ms committed
88
                const prepareResult: boolean = await this.prepareTrialJob(trialJobId);
Deshui Yu's avatar
Deshui Yu committed
89
90
91
92
                if (prepareResult) {
                    // Remove trial job with trialJobId from job queue
                    this.jobQueue.shift();
                } else {
93
                    // Break the while loop since no GPU resource is available right now,
Deshui Yu's avatar
Deshui Yu committed
94
95
96
                    // Wait to schedule job in next time iteration
                    break;
                }
97
            }
98
            if (restServer.getErrorMessage !== undefined) {
99
                this.stopping = true;
100
                throw new Error(restServer.getErrorMessage);
101
            }
Deshui Yu's avatar
Deshui Yu committed
102
103
            await delay(3000);
        }
104
        this.log.info('RemoteMachineTrainingService run loop exited.');
Deshui Yu's avatar
Deshui Yu committed
105
    }
106

SparkSnail's avatar
SparkSnail committed
107
    /**
108
     * give trial an executor
109
     * @param trial remote machine trial job detail
SparkSnail's avatar
SparkSnail committed
110
     */
111
    public allocateExecutorManagerForTrial(trial: RemoteMachineTrialJobDetail): void {
112
        if (trial.rmMeta === undefined) {
SparkSnail's avatar
SparkSnail committed
113
114
            throw new Error(`rmMeta not set in trial ${trial.id}`);
        }
115
116
117
        const executorManager: ExecutorManager | undefined = this.machineExecutorManagerMap.get(trial.rmMeta);
        if (executorManager === undefined) {
            throw new Error(`executorManager not initialized`);
SparkSnail's avatar
SparkSnail committed
118
        }
119
        this.trialExecutorManagerMap.set(trial.id, executorManager);
SparkSnail's avatar
SparkSnail committed
120
    }
121

SparkSnail's avatar
SparkSnail committed
122
123
    /**
     * If a trial is finished, release the connection resource
124
     * @param trial remote machine trial job detail
SparkSnail's avatar
SparkSnail committed
125
     */
126
    public releaseTrialResource(trial: RemoteMachineTrialJobDetail): void {
127
        if (trial.rmMeta === undefined) {
SparkSnail's avatar
SparkSnail committed
128
129
            throw new Error(`rmMeta not set in trial ${trial.id}`);
        }
130
        const executorManager = this.trialExecutorManagerMap.get(trial.id);
131
        if (executorManager === undefined) {
132
            throw new Error(`ExecutorManager is not assigned for trial ${trial.id}`);
SparkSnail's avatar
SparkSnail committed
133
        }
134
135
        // Note, it still keep reference in trialExecutorManagerMap, as there may be following requests from nni manager.
        executorManager.releaseExecutor(trial.id);
SparkSnail's avatar
SparkSnail committed
136
    }
Deshui Yu's avatar
Deshui Yu committed
137
138
139
140

    /**
     * List submitted trial jobs
     */
141
    public async listTrialJobs(): Promise<TrialJobDetail[]> {
Deshui Yu's avatar
Deshui Yu committed
142
143
144
        const jobs: TrialJobDetail[] = [];
        const deferred: Deferred<TrialJobDetail[]> = new Deferred<TrialJobDetail[]>();

145
        for (const [key,] of this.trialJobsMap) {
146
            jobs.push(await this.getTrialJob(key));
147
        }
Deshui Yu's avatar
Deshui Yu committed
148
149
150
151
152
153
154
155
156
157
158
        deferred.resolve(jobs);

        return deferred.promise;
    }

    /**
     * Get trial job detail information
     * @param trialJobId ID of trial job
     */
    public async getTrialJob(trialJobId: string): Promise<TrialJobDetail> {
        const trialJob: RemoteMachineTrialJobDetail | undefined = this.trialJobsMap.get(trialJobId);
159
        if (trialJob === undefined) {
Deshui Yu's avatar
Deshui Yu committed
160
161
162
163
            throw new NNIError(NNIErrorNames.NOT_FOUND, `trial job id ${trialJobId} not found`);
        }
        //TO DO: add another job status, and design new job status change logic
        if (trialJob.status === 'RUNNING' || trialJob.status === 'UNKNOWN') {
164
            // Get executor where the job is running
Deshui Yu's avatar
Deshui Yu committed
165
166
167
            if (trialJob.rmMeta === undefined) {
                throw new Error(`rmMeta not set for submitted job ${trialJobId}`);
            }
168
            const executor = await this.getExecutor(trialJob.id);
Deshui Yu's avatar
Deshui Yu committed
169

170
            return this.updateTrialJobStatus(trialJob, executor);
Deshui Yu's avatar
Deshui Yu committed
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
        } else {
            return trialJob;
        }
    }

    /**
     * Add job metrics listener
     * @param listener callback listener
     */
    public addTrialJobMetricListener(listener: (metric: TrialJobMetric) => void): void {
        this.metricsEmitter.on('metric', listener);
    }

    /**
     * Remove job metrics listener
     * @param listener callback listener
     */
    public removeTrialJobMetricListener(listener: (metric: TrialJobMetric) => void): void {
        this.metricsEmitter.off('metric', listener);
    }

    /**
     * Submit trial job
     * @param form trial job description form
     */
196
    public async submitTrialJob(form: TrialJobApplicationForm): Promise<TrialJobDetail> {
197
        if (this.trialConfig === undefined) {
Deshui Yu's avatar
Deshui Yu committed
198
199
200
            throw new Error('trial config is not initialized');
        }

201
202
        // Generate trial job id(random)
        const trialJobId: string = uniqueString(5);
Deshui Yu's avatar
Deshui Yu committed
203

204
205
206
207
        const trialJobDetail: RemoteMachineTrialJobDetail = new RemoteMachineTrialJobDetail(
            trialJobId,
            'WAITING',
            Date.now(),
208
            "unset",
209
210
211
212
            form
        );
        this.jobQueue.push(trialJobId);
        this.trialJobsMap.set(trialJobId, trialJobDetail);
213

214
        return Promise.resolve(trialJobDetail);
Deshui Yu's avatar
Deshui Yu committed
215
216
    }

217
218
219
220
221
    /**
     * Update trial job for multi-phase
     * @param trialJobId trial job id
     * @param form job application form
     */
222
    public async updateTrialJob(trialJobId: string, form: TrialJobApplicationForm): Promise<TrialJobDetail> {
chicm-ms's avatar
chicm-ms committed
223
224
225
226
        const trialJobDetail: undefined | TrialJobDetail = this.trialJobsMap.get(trialJobId);
        if (trialJobDetail === undefined) {
            throw new Error(`updateTrialJob failed: ${trialJobId} not found`);
        }
chicm-ms's avatar
chicm-ms committed
227
        await this.writeParameterFile(trialJobId, form.hyperParameters);
chicm-ms's avatar
chicm-ms committed
228
229

        return trialJobDetail;
230
    }
231

232
233
234
235
    /**
     * Is multiphase job supported in current training service
     */
    public get isMultiPhaseJobSupported(): boolean {
236
        return true;
237
238
    }

Deshui Yu's avatar
Deshui Yu committed
239
240
241
242
    /**
     * Cancel trial job
     * @param trialJobId ID of trial job
     */
QuanluZhang's avatar
QuanluZhang committed
243
    public async cancelTrialJob(trialJobId: string, isEarlyStopped: boolean = false): Promise<void> {
Deshui Yu's avatar
Deshui Yu committed
244
        const trialJob: RemoteMachineTrialJobDetail | undefined = this.trialJobsMap.get(trialJobId);
245
        if (trialJob === undefined) {
Deshui Yu's avatar
Deshui Yu committed
246
247
248
249
            throw new Error(`trial job id ${trialJobId} not found`);
        }

        // Remove the job with trialJobId from job queue
chicm-ms's avatar
chicm-ms committed
250
        const index: number = this.jobQueue.indexOf(trialJobId);
251
        if (index >= 0) {
Deshui Yu's avatar
Deshui Yu committed
252
253
254
            this.jobQueue.splice(index, 1);
        }

255
        // Get executor where the job is running
Deshui Yu's avatar
Deshui Yu committed
256
257
        if (trialJob.rmMeta !== undefined) {
            // If the trial job is already scheduled, check its status and kill the trial process in remote machine
258
            const executor = await this.getExecutor(trialJob.id);
Deshui Yu's avatar
Deshui Yu committed
259

chicm-ms's avatar
chicm-ms committed
260
261
            if (trialJob.status === 'UNKNOWN') {
                trialJob.status = 'USER_CANCELED';
262
                this.releaseTrialResource(trialJob);
chicm-ms's avatar
chicm-ms committed
263
264
265
                return
            }

266
            const jobpidPath: string = this.getJobPidPath(executor, trialJob.id);
Deshui Yu's avatar
Deshui Yu committed
267
            try {
268
269
                // Mark the toEarlyStop tag here
                trialJob.isEarlyStopped = isEarlyStopped;
270
                await executor.killChildProcesses(jobpidPath);
271
                this.releaseTrialResource(trialJob);
Deshui Yu's avatar
Deshui Yu committed
272
273
            } catch (error) {
                // Not handle the error since pkill failed will not impact trial job's current status
274
                this.log.error(`remoteTrainingService.cancelTrialJob: ${error}`);
Deshui Yu's avatar
Deshui Yu committed
275
276
277
            }
        } else {
            // Job is not scheduled yet, set status to 'USER_CANCELLED' directly
QuanluZhang's avatar
QuanluZhang committed
278
279
            assert(isEarlyStopped === false, 'isEarlyStopped is not supposed to be true here.');
            trialJob.status = getJobCancelStatus(isEarlyStopped);
Deshui Yu's avatar
Deshui Yu committed
280
281
282
283
284
285
        }
    }

    /**
     * Set culster metadata
     * @param key metadata key
286
     * //1. MACHINE_LIST -- create executor of machine list
Deshui Yu's avatar
Deshui Yu committed
287
288
289
290
291
     * //2. TRIAL_CONFIG -- trial configuration
     * @param value metadata value
     */
    public async setClusterMetadata(key: string, value: string): Promise<void> {
        switch (key) {
SparkSnail's avatar
SparkSnail committed
292
293
294
            case TrialConfigMetadataKey.NNI_MANAGER_IP:
                this.nniManagerIpConfig = <NNIManagerIpConfig>JSON.parse(value);
                break;
295
            case TrialConfigMetadataKey.MACHINE_LIST:
Deshui Yu's avatar
Deshui Yu committed
296
                await this.setupConnections(value);
297
                this.gpuScheduler = new GPUScheduler(this.machineExecutorManagerMap);
Deshui Yu's avatar
Deshui Yu committed
298
                break;
chicm-ms's avatar
chicm-ms committed
299
            case TrialConfigMetadataKey.TRIAL_CONFIG: {
300
                const remoteMachineTrailConfig: TrialConfig = <TrialConfig>JSON.parse(value);
Deshui Yu's avatar
Deshui Yu committed
301
                // Parse trial config failed, throw Error
302
                if (remoteMachineTrailConfig === undefined) {
Deshui Yu's avatar
Deshui Yu committed
303
304
305
                    throw new Error('trial config parsed failed');
                }
                // codeDir is not a valid directory, throw Error
306
                if (!fs.lstatSync(remoteMachineTrailConfig.codeDir)
307
                    .isDirectory()) {
Deshui Yu's avatar
Deshui Yu committed
308
309
                    throw new Error(`codeDir ${remoteMachineTrailConfig.codeDir} is not a directory`);
                }
310
311

                try {
312
                    // Validate to make sure codeDir doesn't have too many files
313
                    await validateCodeDir(remoteMachineTrailConfig.codeDir);
314
315
                    // Copy codeDir to remote machine
                    for (const [rmMeta, executorManager] of this.machineExecutorManagerMap.entries()) {
316
                        const executor: ShellExecutor = await executorManager.getExecutor(this.initExecutorId);
317
318
319
                        if (executor !== undefined) {
                            this.machineCopyExpCodeDirPromiseMap.set(
                                rmMeta,
320
321
                                executor.copyDirectoryToRemote(remoteMachineTrailConfig.codeDir, executor.getRemoteCodePath(getExperimentId()))
                            );
322
323
                        }
                    }
324

325
                } catch (error) {
326
                    this.log.error(error);
327

328
                    return Promise.reject(new Error(error));
329
330
                }

Deshui Yu's avatar
Deshui Yu committed
331
332
                this.trialConfig = remoteMachineTrailConfig;
                break;
chicm-ms's avatar
chicm-ms committed
333
            }
334
335
336
            case TrialConfigMetadataKey.MULTI_PHASE:
                this.isMultiPhase = (value === 'true' || value === 'True');
                break;
337
338
339
            case TrialConfigMetadataKey.VERSION_CHECK:
                this.versionCheck = (value === 'true' || value === 'True');
                break;
SparkSnail's avatar
SparkSnail committed
340
341
342
            case TrialConfigMetadataKey.LOG_COLLECTION:
                this.logCollection = value;
                break;
Deshui Yu's avatar
Deshui Yu committed
343
344
345
346
347
348
349
350
351
352
            default:
                //Reject for unknown keys
                throw new Error(`Uknown key: ${key}`);
        }
    }

    /**
     * Get culster metadata
     * @param key metadata key
     */
353
    public async getClusterMetadata(_key: string): Promise<string> {
354
        return "";
Deshui Yu's avatar
Deshui Yu committed
355
    }
356

SparkSnail's avatar
SparkSnail committed
357
    /**
358
     * cleanup() has a time out of 10s to clean remote connections
SparkSnail's avatar
SparkSnail committed
359
360
     */
    public async cleanUp(): Promise<void> {
chicm-ms's avatar
chicm-ms committed
361
        this.log.info('Stopping remote machine training service...');
Deshui Yu's avatar
Deshui Yu committed
362
        this.stopping = true;
363
364
365
366
367
368
369
370
371
        await this.cleanupConnections();
    }

    private async getExecutor(trialId: string): Promise<ShellExecutor> {
        const executorManager = this.trialExecutorManagerMap.get(trialId);
        if (executorManager === undefined) {
            throw new Error(`ExecutorManager is not assigned for trial ${trialId}`);
        }
        return await executorManager.getExecutor(trialId);
SparkSnail's avatar
SparkSnail committed
372
    }
373

374
375
376
377
    /**
     * remove gpu reversion when job is not running
     */
    private updateGpuReservation(): void {
378
379
380
381
382
        if (this.gpuScheduler) {
            for (const [key, value] of this.trialJobsMap) {
                if (!['WAITING', 'RUNNING'].includes(value.status)) {
                    this.gpuScheduler.removeGpuReservation(key, this.trialJobsMap);
                }
383
384
385
386
            }
        }
    }

SparkSnail's avatar
SparkSnail committed
387
388
389
390
    /**
     * stop gpu_metric_collector process in remote machine and remove unused scripts
     */
    private async cleanupConnections(): Promise<void> {
391
        try {
392
393
            for (const executorManager of this.machineExecutorManagerMap.values()) {
                const executor = await executorManager.getExecutor(this.initExecutorId);
394
                if (executor !== undefined) {
395
396
                    this.log.info(`killing gpu metric collector on ${executor.name}`);
                    const gpuJobPidPath: string = executor.joinPath(executor.getRemoteScriptsPath(getExperimentId()), 'pid');
397
                    await executor.killChildProcesses(gpuJobPidPath, true);
SparkSnail's avatar
SparkSnail committed
398
                }
399
                executorManager.releaseAllExecutor();
SparkSnail's avatar
SparkSnail committed
400
            }
401
        } catch (error) {
SparkSnail's avatar
SparkSnail committed
402
            //ignore error, this function is called to cleanup remote connections when experiment is stopping
403
            this.log.error(`Cleanup connection exception, error is ${error}`);
SparkSnail's avatar
SparkSnail committed
404
        }
405
406
    }

Deshui Yu's avatar
Deshui Yu committed
407
    private async setupConnections(machineList: string): Promise<void> {
chicm-ms's avatar
chicm-ms committed
408
        this.log.debug(`Connecting to remote machines: ${machineList}`);
Deshui Yu's avatar
Deshui Yu committed
409
410
        //TO DO: verify if value's format is wrong, and json parse failed, how to handle error
        const rmMetaList: RemoteMachineMeta[] = <RemoteMachineMeta[]>JSON.parse(machineList);
SparkSnail's avatar
SparkSnail committed
411

412
413
        const connectionPromises = [];
        for (const rmMeta of rmMetaList) {
414
            rmMeta.occupiedGpuIndexMap = new Map<number, number>();
415
416
417
418
            const executorManager: ExecutorManager = new ExecutorManager(rmMeta);
            this.log.info(`connecting to ${rmMeta.username}@${rmMeta.ip}:${rmMeta.port}`);
            const executor: ShellExecutor = await executorManager.getExecutor(this.initExecutorId);
            this.log.debug(`reached ${executor.name}`);
419
            this.machineExecutorManagerMap.set(rmMeta, executorManager);
420
            this.log.debug(`initializing ${executor.name}`);
421
            connectionPromises.push(this.initRemoteMachineOnConnected(rmMeta, executor));
422
            this.log.info(`connected to ${executor.name}`);
423
        }
424

425
        await Promise.all(connectionPromises);
Deshui Yu's avatar
Deshui Yu committed
426
427
    }

428
429
    private async initRemoteMachineOnConnected(rmMeta: RemoteMachineMeta, executor: ShellExecutor): Promise<void> {
        // Create root working directory after executor is ready
430
431
        const nniRootDir: string = executor.joinPath(executor.getTempPath(), 'nni');
        await executor.createFolder(executor.getRemoteExperimentRootDir(getExperimentId()));
432

433
        // the directory to store temp scripts in remote machine
434
435
436
        const remoteGpuScriptCollectorDir: string = executor.getRemoteScriptsPath(getExperimentId());

        // clean up previous result.
437
438
        await executor.createFolder(remoteGpuScriptCollectorDir, true);
        await executor.allowPermission(false, nniRootDir, `${nniRootDir}/*`, `${nniRootDir}/scripts/*`);
439

Deshui Yu's avatar
Deshui Yu committed
440
        //Begin to execute gpu_metrics_collection scripts
441
        const script = executor.generateGpuStatsScript(getExperimentId());
442
        executor.executeScript(script, false, true);
443
444
445
        // the timer is trigger in 1 second, it causes multiple runs on server.
        // So reduce it's freqeunce, only allow one of it run.
        const collectingCount: boolean[] = [];
446

447
        const disposable: Rx.IDisposable = this.timer.subscribe(
448
            async () => {
449
450
451
452
453
454
455
456
457
                if (collectingCount.length == 0) {
                    collectingCount.push(true);
                    const cmdresult = await executor.readLastLines(executor.joinPath(remoteGpuScriptCollectorDir, 'gpu_metrics'));
                    if (cmdresult !== "") {
                        rmMeta.gpuSummary = <GPUSummary>JSON.parse(cmdresult);
                        if (rmMeta.gpuSummary.gpuCount === 0) {
                            this.log.warning(`No GPU found on remote machine ${rmMeta.ip}`);
                            this.timer.unsubscribe(disposable);
                        }
458
                    }
459
                    if (this.stopping) {
460
461
462
                        this.timer.unsubscribe(disposable);
                        this.log.debug(`Stopped GPU collector on ${rmMeta.ip}, since experiment is exiting.`);
                    }
463
                    collectingCount.pop();
Deshui Yu's avatar
Deshui Yu committed
464
465
466
467
468
469
                }
            }
        );
    }

    private async prepareTrialJob(trialJobId: string): Promise<boolean> {
chicm-ms's avatar
chicm-ms committed
470
        const deferred: Deferred<boolean> = new Deferred<boolean>();
Deshui Yu's avatar
Deshui Yu committed
471

472
        if (this.trialConfig === undefined) {
Deshui Yu's avatar
Deshui Yu committed
473
474
            throw new Error('trial config is not initialized');
        }
475
476
477
        if (this.gpuScheduler === undefined) {
            throw new Error('gpuScheduler is not initialized');
        }
Deshui Yu's avatar
Deshui Yu committed
478
479
480
481
        const trialJobDetail: RemoteMachineTrialJobDetail | undefined = this.trialJobsMap.get(trialJobId);
        if (trialJobDetail === undefined) {
            throw new NNIError(NNIErrorNames.INVALID_JOB_DETAIL, `Invalid job detail information for trial job ${trialJobId}`);
        }
482
483
484
        // If job is not WATIING, Don't prepare and resolve true immediately
        if (trialJobDetail.status !== 'WAITING') {
            deferred.resolve(true);
485

486
487
            return deferred.promise;
        }
488
        // get an executor from scheduler
489
        const rmScheduleResult: RemoteMachineScheduleResult = this.gpuScheduler.scheduleMachine(this.trialConfig.gpuNum, trialJobDetail);
Deshui Yu's avatar
Deshui Yu committed
490
        if (rmScheduleResult.resultType === ScheduleResultType.REQUIRE_EXCEED_TOTAL) {
chicm-ms's avatar
chicm-ms committed
491
            const errorMessage: string = `Required GPU number ${this.trialConfig.gpuNum} is too large, no machine can meet`;
Deshui Yu's avatar
Deshui Yu committed
492
493
494
            this.log.error(errorMessage);
            deferred.reject();
            throw new NNIError(NNIErrorNames.RESOURCE_NOT_AVAILABLE, errorMessage);
495
        } else if (rmScheduleResult.resultType === ScheduleResultType.SUCCEED
Deshui Yu's avatar
Deshui Yu committed
496
            && rmScheduleResult.scheduleInfo !== undefined) {
chicm-ms's avatar
chicm-ms committed
497
            const rmScheduleInfo: RemoteMachineScheduleInfo = rmScheduleResult.scheduleInfo;
SparkSnail's avatar
SparkSnail committed
498
499

            trialJobDetail.rmMeta = rmScheduleInfo.rmMeta;
500
501
502
503
            const copyExpCodeDirPromise = this.machineCopyExpCodeDirPromiseMap.get(trialJobDetail.rmMeta);
            if (copyExpCodeDirPromise !== undefined) {
                await copyExpCodeDirPromise;
            }
SparkSnail's avatar
SparkSnail committed
504

505
506
507
508
509
            this.allocateExecutorManagerForTrial(trialJobDetail);
            const executor = await this.getExecutor(trialJobDetail.id);

            trialJobDetail.workingDirectory = executor.joinPath(executor.getRemoteExperimentRootDir(getExperimentId()), 'trials', trialJobDetail.id);

Deshui Yu's avatar
Deshui Yu committed
510
            await this.launchTrialOnScheduledMachine(
511
                trialJobId, trialJobDetail.form, rmScheduleInfo);
Deshui Yu's avatar
Deshui Yu committed
512
513

            trialJobDetail.status = 'RUNNING';
514
            trialJobDetail.url = `file://${rmScheduleInfo.rmMeta.ip}:${trialJobDetail.workingDirectory}`;
515
            trialJobDetail.startTime = Date.now();
Deshui Yu's avatar
Deshui Yu committed
516

517
            this.trialJobsMap.set(trialJobId, trialJobDetail);
Deshui Yu's avatar
Deshui Yu committed
518
            deferred.resolve(true);
519
        } else if (rmScheduleResult.resultType === ScheduleResultType.TMP_NO_AVAILABLE_GPU) {
Deshui Yu's avatar
Deshui Yu committed
520
521
522
            this.log.info(`Right now no available GPU can be allocated for trial ${trialJobId}, will try to schedule later`);
            deferred.resolve(false);
        } else {
523
            deferred.reject(`Invalid schedule resutl type: ${rmScheduleResult.resultType}`);
Deshui Yu's avatar
Deshui Yu committed
524
525
526
527
528
        }

        return deferred.promise;
    }

529
    private async launchTrialOnScheduledMachine(trialJobId: string, form: TrialJobApplicationForm,
530
        rmScheduleInfo: RemoteMachineScheduleInfo): Promise<void> {
531
        if (this.trialConfig === undefined) {
Deshui Yu's avatar
Deshui Yu committed
532
533
            throw new Error('trial config is not initialized');
        }
chicm-ms's avatar
chicm-ms committed
534
        const cudaVisibleDevice: string = rmScheduleInfo.cudaVisibleDevice;
535
        const executor = await this.getExecutor(trialJobId);
536
537
538
539
540
        const trialJobDetail: RemoteMachineTrialJobDetail | undefined = this.trialJobsMap.get(trialJobId);
        if (trialJobDetail === undefined) {
            throw new Error(`Can not get trial job detail for job: ${trialJobId}`);
        }

Deshui Yu's avatar
Deshui Yu committed
541
542
        const trialLocalTempFolder: string = path.join(this.expRootDir, 'trials-local', trialJobId);

543
        await executor.createFolder(executor.joinPath(trialJobDetail.workingDirectory, '.nni'));
Deshui Yu's avatar
Deshui Yu committed
544
545
546

        // RemoteMachineRunShellFormat is the run shell format string,
        // See definition in remoteMachineData.ts
SparkSnail's avatar
SparkSnail committed
547

548
        let cudaVisible: string;
chicm-ms's avatar
chicm-ms committed
549
550
        // Set CUDA_VISIBLE_DEVICES environment variable based on cudaVisibleDevice
        // If no valid cudaVisibleDevice is defined, set CUDA_VISIBLE_DEVICES to empty string to hide GPU device
SparkSnail's avatar
SparkSnail committed
551
552
        // If gpuNum is undefined, will not set CUDA_VISIBLE_DEVICES in script
        if (this.trialConfig.gpuNum === undefined) {
553
            cudaVisible = ""
SparkSnail's avatar
SparkSnail committed
554
        } else {
chicm-ms's avatar
chicm-ms committed
555
            if (typeof cudaVisibleDevice === 'string' && cudaVisibleDevice.length > 0) {
556
                cudaVisible = `CUDA_VISIBLE_DEVICES=${cudaVisibleDevice}`;
SparkSnail's avatar
SparkSnail committed
557
            } else {
558
                cudaVisible = `CUDA_VISIBLE_DEVICES=" "`;
SparkSnail's avatar
SparkSnail committed
559
            }
SparkSnail's avatar
SparkSnail committed
560
        }
561
562
        const nniManagerIp: string = this.nniManagerIpConfig ? this.nniManagerIpConfig.nniManagerIp : getIPV4Address();
        if (this.remoteRestServerPort === undefined) {
SparkSnail's avatar
SparkSnail committed
563
564
565
            const restServer: RemoteMachineJobRestServer = component.get(RemoteMachineJobRestServer);
            this.remoteRestServerPort = restServer.clusterRestServerPort;
        }
566
        const version: string = this.versionCheck ? await getVersion() : '';
567
568
        const runScriptTrialContent: string = executor.generateStartScript(
            trialJobDetail.workingDirectory,
Deshui Yu's avatar
Deshui Yu committed
569
            trialJobId,
SparkSnail's avatar
SparkSnail committed
570
            getExperimentId(),
571
            trialJobDetail.form.sequenceId.toString(),
572
            this.isMultiPhase,
573
            this.trialConfig.command,
SparkSnail's avatar
SparkSnail committed
574
575
            nniManagerIp,
            this.remoteRestServerPort,
576
            version,
577
            this.logCollection, cudaVisible);
Deshui Yu's avatar
Deshui Yu committed
578
579

        //create tmp trial working folder locally.
580
        await execMkdir(path.join(trialLocalTempFolder, '.nni'));
581
582
583

        // Write install_nni.sh, it's not used in Windows platform.
        await fs.promises.writeFile(path.join(trialLocalTempFolder, executor.getScriptName("install_nni")), CONTAINER_INSTALL_NNI_SHELL_FORMAT, { encoding: 'utf8' });
584
        // Write file content ( run.sh and parameter.cfg ) to local tmp files
585
        await fs.promises.writeFile(path.join(trialLocalTempFolder, executor.getScriptName("run")), runScriptTrialContent, { encoding: 'utf8' });
chicm-ms's avatar
chicm-ms committed
586
        await this.writeParameterFile(trialJobId, form.hyperParameters);
Deshui Yu's avatar
Deshui Yu committed
587
        // Copy files in codeDir to remote working directory
588
        await executor.copyDirectoryToRemote(trialLocalTempFolder, trialJobDetail.workingDirectory);
Deshui Yu's avatar
Deshui Yu committed
589
        // Execute command in remote machine
590
        executor.executeScript(executor.joinPath(trialJobDetail.workingDirectory, executor.getScriptName("run")), true, true);
Deshui Yu's avatar
Deshui Yu committed
591
592
    }

593
    private async updateTrialJobStatus(trialJob: RemoteMachineTrialJobDetail, executor: ShellExecutor): Promise<TrialJobDetail> {
Deshui Yu's avatar
Deshui Yu committed
594
        const deferred: Deferred<TrialJobDetail> = new Deferred<TrialJobDetail>();
595
596
        const jobpidPath: string = this.getJobPidPath(executor, trialJob.id);
        const trialReturnCodeFilePath: string = executor.joinPath(executor.getRemoteExperimentRootDir(getExperimentId()), 'trials', trialJob.id, '.nni', 'code');
597
        /* eslint-disable require-atomic-updates */
Deshui Yu's avatar
Deshui Yu committed
598
        try {
599
            const isAlive = await executor.isProcessAlive(jobpidPath);
Deshui Yu's avatar
Deshui Yu committed
600
            // if the process of jobpid is not alive any more
601
602
603
604
            if (!isAlive) {
                const trialReturnCode: string = await executor.getRemoteFileContent(trialReturnCodeFilePath);
                this.log.debug(`trailjob ${trialJob.id} return code: ${trialReturnCode}`);
                const match: RegExpMatchArray | null = trialReturnCode.trim()
605
                    .match(/^-?(\d+)\s+(\d+)$/);
606
                if (match !== null) {
Deshui Yu's avatar
Deshui Yu committed
607
608
609
610
611
                    const { 1: code, 2: timestamp } = match;
                    // Update trial job's status based on result code
                    if (parseInt(code, 10) === 0) {
                        trialJob.status = 'SUCCEEDED';
                    } else {
612
613
614
615
616
617
                        // isEarlyStopped is never set, mean it's not cancelled by NNI, so if the process's exit code >0, mark it as FAILED
                        if (trialJob.isEarlyStopped === undefined) {
                            trialJob.status = 'FAILED';
                        } else {
                            trialJob.status = getJobCancelStatus(trialJob.isEarlyStopped);
                        }
Deshui Yu's avatar
Deshui Yu committed
618
                    }
619
                    trialJob.endTime = parseInt(timestamp, 10);
620
                    this.releaseTrialResource(trialJob);
Deshui Yu's avatar
Deshui Yu committed
621
                }
chicm-ms's avatar
chicm-ms committed
622
                this.log.debug(`trailJob status update: ${trialJob.id}, ${trialJob.status}`);
Deshui Yu's avatar
Deshui Yu committed
623
624
625
            }
            deferred.resolve(trialJob);
        } catch (error) {
626
            this.log.debug(`(Ignorable mostly)Update job status exception, error is ${error.message}`);
Deshui Yu's avatar
Deshui Yu committed
627
628
629
630
631
632
633
            if (error instanceof NNIError && error.name === NNIErrorNames.NOT_FOUND) {
                deferred.resolve(trialJob);
            } else {
                trialJob.status = 'UNKNOWN';
                deferred.resolve(trialJob);
            }
        }
634
        /* eslint-enable require-atomic-updates */
Deshui Yu's avatar
Deshui Yu committed
635
636
637
        return deferred.promise;
    }

chicm-ms's avatar
chicm-ms committed
638
    public get MetricsEmitter(): EventEmitter {
SparkSnail's avatar
SparkSnail committed
639
640
641
        return this.metricsEmitter;
    }

642
    private getJobPidPath(executor: ShellExecutor, jobId: string): string {
Deshui Yu's avatar
Deshui Yu committed
643
644
645
646
647
        const trialJobDetail: RemoteMachineTrialJobDetail | undefined = this.trialJobsMap.get(jobId);
        if (trialJobDetail === undefined) {
            throw new NNIError(NNIErrorNames.INVALID_JOB_DETAIL, `Invalid job detail information for trial job ${jobId}`);
        }

648
        return executor.joinPath(trialJobDetail.workingDirectory, '.nni', 'jobpid');
Deshui Yu's avatar
Deshui Yu committed
649
    }
chicm-ms's avatar
chicm-ms committed
650

chicm-ms's avatar
chicm-ms committed
651
    private async writeParameterFile(trialJobId: string, hyperParameters: HyperParameters): Promise<void> {
652
        const executor = await this.getExecutor(trialJobId);
chicm-ms's avatar
chicm-ms committed
653

654
        const trialWorkingFolder: string = executor.joinPath(executor.getRemoteExperimentRootDir(getExperimentId()), 'trials', trialJobId);
chicm-ms's avatar
chicm-ms committed
655
656
        const trialLocalTempFolder: string = path.join(this.expRootDir, 'trials-local', trialJobId);

657
        const fileName: string = generateParamFileName(hyperParameters);
chicm-ms's avatar
chicm-ms committed
658
659
660
        const localFilepath: string = path.join(trialLocalTempFolder, fileName);
        await fs.promises.writeFile(localFilepath, hyperParameters.value, { encoding: 'utf8' });

661
        await executor.copyFileToRemote(localFilepath, executor.joinPath(trialWorkingFolder, fileName));
chicm-ms's avatar
chicm-ms committed
662
    }
Deshui Yu's avatar
Deshui Yu committed
663
664
665
}

export { RemoteMachineTrainingService };