remoteMachineTrainingService.ts 30.3 KB
Newer Older
liuzhe-lz's avatar
liuzhe-lz committed
1
2
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.
Deshui Yu's avatar
Deshui Yu committed
3
4
5

'use strict';

6
import * as assert from 'assert';
Deshui Yu's avatar
Deshui Yu committed
7
8
9
import { EventEmitter } from 'events';
import * as fs from 'fs';
import * as path from 'path';
10
import { ShellExecutor } from 'training_service/remote_machine/shellExecutor';
Deshui Yu's avatar
Deshui Yu committed
11
12
import { Deferred } from 'ts-deferred';
import * as component from '../../common/component';
13
import { NNIError, NNIErrorNames, MethodNotImplementedError } from '../../common/errors';
14
import { getExperimentId } from '../../common/experimentStartupInfo';
Deshui Yu's avatar
Deshui Yu committed
15
16
17
import { getLogger, Logger } from '../../common/log';
import { ObservableTimer } from '../../common/observableTimer';
import {
18
    HyperParameters, NNIManagerIpConfig, TrainingService, TrialJobApplicationForm,
19
    TrialJobDetail, TrialJobMetric, LogType
Deshui Yu's avatar
Deshui Yu committed
20
} from '../../common/trainingService';
21
import {
22
23
    delay, generateParamFileName, getExperimentRootDir, getIPV4Address, getJobCancelStatus,
    getVersion, uniqueString
24
25
} from '../../common/utils';
import { CONTAINER_INSTALL_NNI_SHELL_FORMAT } from '../common/containerJobData';
26
import { GPUSummary, ScheduleResultType } from '../common/gpuData';
27
28
import { TrialConfig } from '../common/trialConfig';
import { TrialConfigMetadataKey } from '../common/trialConfigMetadataKey';
29
import { execMkdir, validateCodeDir } from '../common/util';
Deshui Yu's avatar
Deshui Yu committed
30
31
import { GPUScheduler } from './gpuScheduler';
import {
32
33
    ExecutorManager, RemoteMachineMeta,
    RemoteMachineScheduleInfo, RemoteMachineScheduleResult, RemoteMachineTrialJobDetail
Deshui Yu's avatar
Deshui Yu committed
34
} from './remoteMachineData';
SparkSnail's avatar
SparkSnail committed
35
import { RemoteMachineJobRestServer } from './remoteMachineJobRestServer';
Deshui Yu's avatar
Deshui Yu committed
36
37
38
39

/**
 * Training Service implementation for Remote Machine (Linux)
 */
SparkSnail's avatar
SparkSnail committed
40
@component.Singleton
Deshui Yu's avatar
Deshui Yu committed
41
class RemoteMachineTrainingService implements TrainingService {
42
    private readonly initExecutorId = "initConnection";
43
    private readonly machineExecutorManagerMap: Map<RemoteMachineMeta, ExecutorManager>; //machine excutor map
44
    private readonly machineCopyExpCodeDirPromiseMap: Map<RemoteMachineMeta, Promise<void>>;
45
    private readonly trialExecutorManagerMap: Map<string, ExecutorManager>; //trial excutor map
46
47
    private readonly trialJobsMap: Map<string, RemoteMachineTrialJobDetail>;
    private readonly expRootDir: string;
48
    private trialConfig: TrialConfig | undefined;
49
    private gpuScheduler?: GPUScheduler;
50
51
    private readonly jobQueue: string[];
    private readonly timer: ObservableTimer;
Deshui Yu's avatar
Deshui Yu committed
52
    private stopping: boolean = false;
53
54
    private readonly metricsEmitter: EventEmitter;
    private readonly log: Logger;
55
    private isMultiPhase: boolean = false;
SparkSnail's avatar
SparkSnail committed
56
57
    private remoteRestServerPort?: number;
    private nniManagerIpConfig?: NNIManagerIpConfig;
58
    private versionCheck: boolean = true;
SparkSnail's avatar
SparkSnail committed
59
    private logCollection: string;
60
    private sshConnectionPromises: any[];
Deshui Yu's avatar
Deshui Yu committed
61
62
63
64

    constructor(@component.Inject timer: ObservableTimer) {
        this.metricsEmitter = new EventEmitter();
        this.trialJobsMap = new Map<string, RemoteMachineTrialJobDetail>();
65
        this.trialExecutorManagerMap = new Map<string, ExecutorManager>();
66
        this.machineCopyExpCodeDirPromiseMap = new Map<RemoteMachineMeta, Promise<void>>();
67
        this.machineExecutorManagerMap = new Map<RemoteMachineMeta, ExecutorManager>();
Deshui Yu's avatar
Deshui Yu committed
68
        this.jobQueue = [];
69
        this.sshConnectionPromises = [];
Deshui Yu's avatar
Deshui Yu committed
70
71
72
        this.expRootDir = getExperimentRootDir();
        this.timer = timer;
        this.log = getLogger();
SparkSnail's avatar
SparkSnail committed
73
        this.logCollection = 'none';
chicm-ms's avatar
chicm-ms committed
74
        this.log.info('Construct remote machine training service.');
Deshui Yu's avatar
Deshui Yu committed
75
76
77
78
79
80
    }

    /**
     * Loop to launch trial jobs and collect trial metrics
     */
    public async run(): Promise<void> {
SparkSnail's avatar
SparkSnail committed
81
82
        const restServer: RemoteMachineJobRestServer = component.get(RemoteMachineJobRestServer);
        await restServer.start();
83
        restServer.setEnableVersionCheck = this.versionCheck;
chicm-ms's avatar
chicm-ms committed
84
        this.log.info('Run remote machine training service.');
85
86
87
88
89
90
        if (this.sshConnectionPromises.length > 0) {
            await Promise.all(this.sshConnectionPromises);
            this.log.info('ssh connection initialized!');
            // set sshConnectionPromises to [] to avoid log information duplicated
            this.sshConnectionPromises = [];
        }
Deshui Yu's avatar
Deshui Yu committed
91
92
        while (!this.stopping) {
            while (this.jobQueue.length > 0) {
SparkSnail's avatar
SparkSnail committed
93
                this.updateGpuReservation();
Deshui Yu's avatar
Deshui Yu committed
94
                const trialJobId: string = this.jobQueue[0];
chicm-ms's avatar
chicm-ms committed
95
                const prepareResult: boolean = await this.prepareTrialJob(trialJobId);
Deshui Yu's avatar
Deshui Yu committed
96
97
98
99
                if (prepareResult) {
                    // Remove trial job with trialJobId from job queue
                    this.jobQueue.shift();
                } else {
100
                    // Break the while loop since no GPU resource is available right now,
Deshui Yu's avatar
Deshui Yu committed
101
102
103
                    // Wait to schedule job in next time iteration
                    break;
                }
104
            }
105
            if (restServer.getErrorMessage !== undefined) {
106
                this.stopping = true;
107
                throw new Error(restServer.getErrorMessage);
108
            }
Deshui Yu's avatar
Deshui Yu committed
109
110
            await delay(3000);
        }
111
        this.log.info('RemoteMachineTrainingService run loop exited.');
Deshui Yu's avatar
Deshui Yu committed
112
    }
113

SparkSnail's avatar
SparkSnail committed
114
    /**
115
     * give trial an executor
116
     * @param trial remote machine trial job detail
SparkSnail's avatar
SparkSnail committed
117
     */
118
    public allocateExecutorManagerForTrial(trial: RemoteMachineTrialJobDetail): void {
119
        if (trial.rmMeta === undefined) {
SparkSnail's avatar
SparkSnail committed
120
121
            throw new Error(`rmMeta not set in trial ${trial.id}`);
        }
122
123
124
        const executorManager: ExecutorManager | undefined = this.machineExecutorManagerMap.get(trial.rmMeta);
        if (executorManager === undefined) {
            throw new Error(`executorManager not initialized`);
SparkSnail's avatar
SparkSnail committed
125
        }
126
        this.trialExecutorManagerMap.set(trial.id, executorManager);
SparkSnail's avatar
SparkSnail committed
127
    }
128

SparkSnail's avatar
SparkSnail committed
129
130
    /**
     * If a trial is finished, release the connection resource
131
     * @param trial remote machine trial job detail
SparkSnail's avatar
SparkSnail committed
132
     */
133
    public releaseTrialResource(trial: RemoteMachineTrialJobDetail): void {
134
        if (trial.rmMeta === undefined) {
SparkSnail's avatar
SparkSnail committed
135
136
            throw new Error(`rmMeta not set in trial ${trial.id}`);
        }
137
        const executorManager = this.trialExecutorManagerMap.get(trial.id);
138
        if (executorManager === undefined) {
139
            throw new Error(`ExecutorManager is not assigned for trial ${trial.id}`);
SparkSnail's avatar
SparkSnail committed
140
        }
141
142
        // Note, it still keep reference in trialExecutorManagerMap, as there may be following requests from nni manager.
        executorManager.releaseExecutor(trial.id);
SparkSnail's avatar
SparkSnail committed
143
    }
Deshui Yu's avatar
Deshui Yu committed
144
145
146
147

    /**
     * List submitted trial jobs
     */
148
    public async listTrialJobs(): Promise<TrialJobDetail[]> {
Deshui Yu's avatar
Deshui Yu committed
149
150
151
        const jobs: TrialJobDetail[] = [];
        const deferred: Deferred<TrialJobDetail[]> = new Deferred<TrialJobDetail[]>();

152
        for (const [key,] of this.trialJobsMap) {
153
            jobs.push(await this.getTrialJob(key));
154
        }
Deshui Yu's avatar
Deshui Yu committed
155
156
157
158
159
160
161
162
163
164
165
        deferred.resolve(jobs);

        return deferred.promise;
    }

    /**
     * Get trial job detail information
     * @param trialJobId ID of trial job
     */
    public async getTrialJob(trialJobId: string): Promise<TrialJobDetail> {
        const trialJob: RemoteMachineTrialJobDetail | undefined = this.trialJobsMap.get(trialJobId);
166
        if (trialJob === undefined) {
Deshui Yu's avatar
Deshui Yu committed
167
168
169
170
            throw new NNIError(NNIErrorNames.NOT_FOUND, `trial job id ${trialJobId} not found`);
        }
        //TO DO: add another job status, and design new job status change logic
        if (trialJob.status === 'RUNNING' || trialJob.status === 'UNKNOWN') {
171
            // Get executor where the job is running
Deshui Yu's avatar
Deshui Yu committed
172
173
174
            if (trialJob.rmMeta === undefined) {
                throw new Error(`rmMeta not set for submitted job ${trialJobId}`);
            }
175
            const executor = await this.getExecutor(trialJob.id);
Deshui Yu's avatar
Deshui Yu committed
176

177
            return this.updateTrialJobStatus(trialJob, executor);
Deshui Yu's avatar
Deshui Yu committed
178
179
180
181
182
        } else {
            return trialJob;
        }
    }

183
184
185
186
187
188
189
190
191
    /**
     * Get trial job log
     * @param _trialJobId ID of trial job
     * @param _logType 'TRIAL_LOG' | 'TRIAL_STDERR'
     */
    public async getTrialLog(_trialJobId: string, _logType: LogType): Promise<string> {
        throw new MethodNotImplementedError();
    }

Deshui Yu's avatar
Deshui Yu committed
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
    /**
     * Add job metrics listener
     * @param listener callback listener
     */
    public addTrialJobMetricListener(listener: (metric: TrialJobMetric) => void): void {
        this.metricsEmitter.on('metric', listener);
    }

    /**
     * Remove job metrics listener
     * @param listener callback listener
     */
    public removeTrialJobMetricListener(listener: (metric: TrialJobMetric) => void): void {
        this.metricsEmitter.off('metric', listener);
    }

    /**
     * Submit trial job
     * @param form trial job description form
     */
212
    public async submitTrialJob(form: TrialJobApplicationForm): Promise<TrialJobDetail> {
213
        if (this.trialConfig === undefined) {
Deshui Yu's avatar
Deshui Yu committed
214
215
216
            throw new Error('trial config is not initialized');
        }

217
218
        // Generate trial job id(random)
        const trialJobId: string = uniqueString(5);
Deshui Yu's avatar
Deshui Yu committed
219

220
221
222
223
        const trialJobDetail: RemoteMachineTrialJobDetail = new RemoteMachineTrialJobDetail(
            trialJobId,
            'WAITING',
            Date.now(),
224
            "unset",
225
226
227
228
            form
        );
        this.jobQueue.push(trialJobId);
        this.trialJobsMap.set(trialJobId, trialJobDetail);
229

230
        return Promise.resolve(trialJobDetail);
Deshui Yu's avatar
Deshui Yu committed
231
232
    }

233
234
235
236
237
    /**
     * Update trial job for multi-phase
     * @param trialJobId trial job id
     * @param form job application form
     */
238
    public async updateTrialJob(trialJobId: string, form: TrialJobApplicationForm): Promise<TrialJobDetail> {
chicm-ms's avatar
chicm-ms committed
239
240
241
242
        const trialJobDetail: undefined | TrialJobDetail = this.trialJobsMap.get(trialJobId);
        if (trialJobDetail === undefined) {
            throw new Error(`updateTrialJob failed: ${trialJobId} not found`);
        }
chicm-ms's avatar
chicm-ms committed
243
        await this.writeParameterFile(trialJobId, form.hyperParameters);
chicm-ms's avatar
chicm-ms committed
244
245

        return trialJobDetail;
246
    }
247

248
249
250
251
    /**
     * Is multiphase job supported in current training service
     */
    public get isMultiPhaseJobSupported(): boolean {
252
        return true;
253
254
    }

Deshui Yu's avatar
Deshui Yu committed
255
256
257
258
    /**
     * Cancel trial job
     * @param trialJobId ID of trial job
     */
QuanluZhang's avatar
QuanluZhang committed
259
    public async cancelTrialJob(trialJobId: string, isEarlyStopped: boolean = false): Promise<void> {
Deshui Yu's avatar
Deshui Yu committed
260
        const trialJob: RemoteMachineTrialJobDetail | undefined = this.trialJobsMap.get(trialJobId);
261
        if (trialJob === undefined) {
Deshui Yu's avatar
Deshui Yu committed
262
263
264
265
            throw new Error(`trial job id ${trialJobId} not found`);
        }

        // Remove the job with trialJobId from job queue
chicm-ms's avatar
chicm-ms committed
266
        const index: number = this.jobQueue.indexOf(trialJobId);
267
        if (index >= 0) {
Deshui Yu's avatar
Deshui Yu committed
268
269
270
            this.jobQueue.splice(index, 1);
        }

271
        // Get executor where the job is running
Deshui Yu's avatar
Deshui Yu committed
272
273
        if (trialJob.rmMeta !== undefined) {
            // If the trial job is already scheduled, check its status and kill the trial process in remote machine
274
            const executor = await this.getExecutor(trialJob.id);
Deshui Yu's avatar
Deshui Yu committed
275

chicm-ms's avatar
chicm-ms committed
276
277
            if (trialJob.status === 'UNKNOWN') {
                trialJob.status = 'USER_CANCELED';
278
                this.releaseTrialResource(trialJob);
chicm-ms's avatar
chicm-ms committed
279
280
281
                return
            }

282
            const jobpidPath: string = this.getJobPidPath(executor, trialJob.id);
Deshui Yu's avatar
Deshui Yu committed
283
            try {
284
285
                // Mark the toEarlyStop tag here
                trialJob.isEarlyStopped = isEarlyStopped;
286
                await executor.killChildProcesses(jobpidPath);
287
                this.releaseTrialResource(trialJob);
Deshui Yu's avatar
Deshui Yu committed
288
289
            } catch (error) {
                // Not handle the error since pkill failed will not impact trial job's current status
290
                this.log.error(`remoteTrainingService.cancelTrialJob: ${error}`);
Deshui Yu's avatar
Deshui Yu committed
291
292
293
            }
        } else {
            // Job is not scheduled yet, set status to 'USER_CANCELLED' directly
QuanluZhang's avatar
QuanluZhang committed
294
295
            assert(isEarlyStopped === false, 'isEarlyStopped is not supposed to be true here.');
            trialJob.status = getJobCancelStatus(isEarlyStopped);
Deshui Yu's avatar
Deshui Yu committed
296
297
298
299
300
301
        }
    }

    /**
     * Set culster metadata
     * @param key metadata key
302
     * //1. MACHINE_LIST -- create executor of machine list
Deshui Yu's avatar
Deshui Yu committed
303
304
305
306
307
     * //2. TRIAL_CONFIG -- trial configuration
     * @param value metadata value
     */
    public async setClusterMetadata(key: string, value: string): Promise<void> {
        switch (key) {
SparkSnail's avatar
SparkSnail committed
308
309
310
            case TrialConfigMetadataKey.NNI_MANAGER_IP:
                this.nniManagerIpConfig = <NNIManagerIpConfig>JSON.parse(value);
                break;
311
            case TrialConfigMetadataKey.MACHINE_LIST:
Deshui Yu's avatar
Deshui Yu committed
312
                await this.setupConnections(value);
313
                this.gpuScheduler = new GPUScheduler(this.machineExecutorManagerMap);
Deshui Yu's avatar
Deshui Yu committed
314
                break;
chicm-ms's avatar
chicm-ms committed
315
            case TrialConfigMetadataKey.TRIAL_CONFIG: {
316
                const remoteMachineTrailConfig: TrialConfig = <TrialConfig>JSON.parse(value);
Deshui Yu's avatar
Deshui Yu committed
317
                // Parse trial config failed, throw Error
318
                if (remoteMachineTrailConfig === undefined) {
Deshui Yu's avatar
Deshui Yu committed
319
320
321
                    throw new Error('trial config parsed failed');
                }
                // codeDir is not a valid directory, throw Error
322
                if (!fs.lstatSync(remoteMachineTrailConfig.codeDir)
323
                    .isDirectory()) {
Deshui Yu's avatar
Deshui Yu committed
324
325
                    throw new Error(`codeDir ${remoteMachineTrailConfig.codeDir} is not a directory`);
                }
326
327

                try {
328
                    // Validate to make sure codeDir doesn't have too many files
329
                    await validateCodeDir(remoteMachineTrailConfig.codeDir);
330
331
                    // Copy codeDir to remote machine
                    for (const [rmMeta, executorManager] of this.machineExecutorManagerMap.entries()) {
332
                        const executor: ShellExecutor = await executorManager.getExecutor(this.initExecutorId);
333
334
335
                        if (executor !== undefined) {
                            this.machineCopyExpCodeDirPromiseMap.set(
                                rmMeta,
336
337
                                executor.copyDirectoryToRemote(remoteMachineTrailConfig.codeDir, executor.getRemoteCodePath(getExperimentId()))
                            );
338
339
                        }
                    }
340

341
                } catch (error) {
342
                    this.log.error(error);
343

344
                    return Promise.reject(new Error(error));
345
346
                }

Deshui Yu's avatar
Deshui Yu committed
347
348
                this.trialConfig = remoteMachineTrailConfig;
                break;
chicm-ms's avatar
chicm-ms committed
349
            }
350
351
352
            case TrialConfigMetadataKey.MULTI_PHASE:
                this.isMultiPhase = (value === 'true' || value === 'True');
                break;
353
354
355
            case TrialConfigMetadataKey.VERSION_CHECK:
                this.versionCheck = (value === 'true' || value === 'True');
                break;
SparkSnail's avatar
SparkSnail committed
356
357
358
            case TrialConfigMetadataKey.LOG_COLLECTION:
                this.logCollection = value;
                break;
Deshui Yu's avatar
Deshui Yu committed
359
360
361
362
363
364
365
366
367
368
            default:
                //Reject for unknown keys
                throw new Error(`Uknown key: ${key}`);
        }
    }

    /**
     * Get culster metadata
     * @param key metadata key
     */
369
    public async getClusterMetadata(_key: string): Promise<string> {
370
        return "";
Deshui Yu's avatar
Deshui Yu committed
371
    }
372

SparkSnail's avatar
SparkSnail committed
373
    /**
374
     * cleanup() has a time out of 10s to clean remote connections
SparkSnail's avatar
SparkSnail committed
375
376
     */
    public async cleanUp(): Promise<void> {
chicm-ms's avatar
chicm-ms committed
377
        this.log.info('Stopping remote machine training service...');
Deshui Yu's avatar
Deshui Yu committed
378
        this.stopping = true;
379
380
381
382
383
384
385
386
387
        await this.cleanupConnections();
    }

    private async getExecutor(trialId: string): Promise<ShellExecutor> {
        const executorManager = this.trialExecutorManagerMap.get(trialId);
        if (executorManager === undefined) {
            throw new Error(`ExecutorManager is not assigned for trial ${trialId}`);
        }
        return await executorManager.getExecutor(trialId);
SparkSnail's avatar
SparkSnail committed
388
    }
389

390
391
392
393
    /**
     * remove gpu reversion when job is not running
     */
    private updateGpuReservation(): void {
394
395
396
397
398
        if (this.gpuScheduler) {
            for (const [key, value] of this.trialJobsMap) {
                if (!['WAITING', 'RUNNING'].includes(value.status)) {
                    this.gpuScheduler.removeGpuReservation(key, this.trialJobsMap);
                }
399
400
401
402
            }
        }
    }

SparkSnail's avatar
SparkSnail committed
403
404
405
406
    /**
     * stop gpu_metric_collector process in remote machine and remove unused scripts
     */
    private async cleanupConnections(): Promise<void> {
407
        try {
408
409
            for (const executorManager of this.machineExecutorManagerMap.values()) {
                const executor = await executorManager.getExecutor(this.initExecutorId);
410
                if (executor !== undefined) {
411
412
                    this.log.info(`killing gpu metric collector on ${executor.name}`);
                    const gpuJobPidPath: string = executor.joinPath(executor.getRemoteScriptsPath(getExperimentId()), 'pid');
413
                    await executor.killChildProcesses(gpuJobPidPath, true);
SparkSnail's avatar
SparkSnail committed
414
                }
415
                executorManager.releaseAllExecutor();
SparkSnail's avatar
SparkSnail committed
416
            }
417
        } catch (error) {
SparkSnail's avatar
SparkSnail committed
418
            //ignore error, this function is called to cleanup remote connections when experiment is stopping
419
            this.log.error(`Cleanup connection exception, error is ${error}`);
SparkSnail's avatar
SparkSnail committed
420
        }
421
422
    }

Deshui Yu's avatar
Deshui Yu committed
423
    private async setupConnections(machineList: string): Promise<void> {
chicm-ms's avatar
chicm-ms committed
424
        this.log.debug(`Connecting to remote machines: ${machineList}`);
Deshui Yu's avatar
Deshui Yu committed
425
426
        //TO DO: verify if value's format is wrong, and json parse failed, how to handle error
        const rmMetaList: RemoteMachineMeta[] = <RemoteMachineMeta[]>JSON.parse(machineList);
SparkSnail's avatar
SparkSnail committed
427

428
        for (const rmMeta of rmMetaList) {
429
            rmMeta.occupiedGpuIndexMap = new Map<number, number>();
430
431
432
433
            const executorManager: ExecutorManager = new ExecutorManager(rmMeta);
            this.log.info(`connecting to ${rmMeta.username}@${rmMeta.ip}:${rmMeta.port}`);
            const executor: ShellExecutor = await executorManager.getExecutor(this.initExecutorId);
            this.log.debug(`reached ${executor.name}`);
434
            this.machineExecutorManagerMap.set(rmMeta, executorManager);
435
            this.log.debug(`initializing ${executor.name}`);
436
437
            this.sshConnectionPromises.push(this.initRemoteMachineOnConnected(rmMeta, executor));
            this.log.info(`connecting to ${executor.name}`);
438
        }
Deshui Yu's avatar
Deshui Yu committed
439
440
    }

441
442
    private async initRemoteMachineOnConnected(rmMeta: RemoteMachineMeta, executor: ShellExecutor): Promise<void> {
        // Create root working directory after executor is ready
443
444
        const nniRootDir: string = executor.joinPath(executor.getTempPath(), 'nni');
        await executor.createFolder(executor.getRemoteExperimentRootDir(getExperimentId()));
445

446
        // the directory to store temp scripts in remote machine
447
448
449
        const remoteGpuScriptCollectorDir: string = executor.getRemoteScriptsPath(getExperimentId());

        // clean up previous result.
450
        await executor.createFolder(remoteGpuScriptCollectorDir, true);
Junwei Sun's avatar
Junwei Sun committed
451
        await executor.allowPermission(true, nniRootDir);
452

Deshui Yu's avatar
Deshui Yu committed
453
        //Begin to execute gpu_metrics_collection scripts
454
        const script = executor.generateGpuStatsScript(getExperimentId());
455
        executor.executeScript(script, false, true);
456
457
458
        // the timer is trigger in 1 second, it causes multiple runs on server.
        // So reduce it's freqeunce, only allow one of it run.
        const collectingCount: boolean[] = [];
459

460
        const disposable: Rx.IDisposable = this.timer.subscribe(
461
            async () => {
462
463
464
465
466
467
468
469
470
                if (collectingCount.length == 0) {
                    collectingCount.push(true);
                    const cmdresult = await executor.readLastLines(executor.joinPath(remoteGpuScriptCollectorDir, 'gpu_metrics'));
                    if (cmdresult !== "") {
                        rmMeta.gpuSummary = <GPUSummary>JSON.parse(cmdresult);
                        if (rmMeta.gpuSummary.gpuCount === 0) {
                            this.log.warning(`No GPU found on remote machine ${rmMeta.ip}`);
                            this.timer.unsubscribe(disposable);
                        }
471
                    }
472
                    if (this.stopping) {
473
474
475
                        this.timer.unsubscribe(disposable);
                        this.log.debug(`Stopped GPU collector on ${rmMeta.ip}, since experiment is exiting.`);
                    }
476
                    collectingCount.pop();
Deshui Yu's avatar
Deshui Yu committed
477
478
479
480
481
482
                }
            }
        );
    }

    private async prepareTrialJob(trialJobId: string): Promise<boolean> {
chicm-ms's avatar
chicm-ms committed
483
        const deferred: Deferred<boolean> = new Deferred<boolean>();
Deshui Yu's avatar
Deshui Yu committed
484

485
        if (this.trialConfig === undefined) {
Deshui Yu's avatar
Deshui Yu committed
486
487
            throw new Error('trial config is not initialized');
        }
488
489
490
        if (this.gpuScheduler === undefined) {
            throw new Error('gpuScheduler is not initialized');
        }
Deshui Yu's avatar
Deshui Yu committed
491
492
493
494
        const trialJobDetail: RemoteMachineTrialJobDetail | undefined = this.trialJobsMap.get(trialJobId);
        if (trialJobDetail === undefined) {
            throw new NNIError(NNIErrorNames.INVALID_JOB_DETAIL, `Invalid job detail information for trial job ${trialJobId}`);
        }
495
496
497
        // If job is not WATIING, Don't prepare and resolve true immediately
        if (trialJobDetail.status !== 'WAITING') {
            deferred.resolve(true);
498

499
500
            return deferred.promise;
        }
501
        // get an executor from scheduler
502
        const rmScheduleResult: RemoteMachineScheduleResult = this.gpuScheduler.scheduleMachine(this.trialConfig.gpuNum, trialJobDetail);
Deshui Yu's avatar
Deshui Yu committed
503
        if (rmScheduleResult.resultType === ScheduleResultType.REQUIRE_EXCEED_TOTAL) {
chicm-ms's avatar
chicm-ms committed
504
            const errorMessage: string = `Required GPU number ${this.trialConfig.gpuNum} is too large, no machine can meet`;
Deshui Yu's avatar
Deshui Yu committed
505
506
507
            this.log.error(errorMessage);
            deferred.reject();
            throw new NNIError(NNIErrorNames.RESOURCE_NOT_AVAILABLE, errorMessage);
508
        } else if (rmScheduleResult.resultType === ScheduleResultType.SUCCEED
Deshui Yu's avatar
Deshui Yu committed
509
            && rmScheduleResult.scheduleInfo !== undefined) {
chicm-ms's avatar
chicm-ms committed
510
            const rmScheduleInfo: RemoteMachineScheduleInfo = rmScheduleResult.scheduleInfo;
SparkSnail's avatar
SparkSnail committed
511
512

            trialJobDetail.rmMeta = rmScheduleInfo.rmMeta;
513
514
515
516
            const copyExpCodeDirPromise = this.machineCopyExpCodeDirPromiseMap.get(trialJobDetail.rmMeta);
            if (copyExpCodeDirPromise !== undefined) {
                await copyExpCodeDirPromise;
            }
SparkSnail's avatar
SparkSnail committed
517

518
519
520
521
522
            this.allocateExecutorManagerForTrial(trialJobDetail);
            const executor = await this.getExecutor(trialJobDetail.id);

            trialJobDetail.workingDirectory = executor.joinPath(executor.getRemoteExperimentRootDir(getExperimentId()), 'trials', trialJobDetail.id);

Deshui Yu's avatar
Deshui Yu committed
523
            await this.launchTrialOnScheduledMachine(
524
                trialJobId, trialJobDetail.form, rmScheduleInfo);
Deshui Yu's avatar
Deshui Yu committed
525
526

            trialJobDetail.status = 'RUNNING';
527
            trialJobDetail.url = `file://${rmScheduleInfo.rmMeta.ip}:${trialJobDetail.workingDirectory}`;
528
            trialJobDetail.startTime = Date.now();
Deshui Yu's avatar
Deshui Yu committed
529

530
            this.trialJobsMap.set(trialJobId, trialJobDetail);
Deshui Yu's avatar
Deshui Yu committed
531
            deferred.resolve(true);
532
        } else if (rmScheduleResult.resultType === ScheduleResultType.TMP_NO_AVAILABLE_GPU) {
Deshui Yu's avatar
Deshui Yu committed
533
534
535
            this.log.info(`Right now no available GPU can be allocated for trial ${trialJobId}, will try to schedule later`);
            deferred.resolve(false);
        } else {
536
            deferred.reject(`Invalid schedule resutl type: ${rmScheduleResult.resultType}`);
Deshui Yu's avatar
Deshui Yu committed
537
538
539
540
541
        }

        return deferred.promise;
    }

542
    private async launchTrialOnScheduledMachine(trialJobId: string, form: TrialJobApplicationForm,
543
        rmScheduleInfo: RemoteMachineScheduleInfo): Promise<void> {
544
        if (this.trialConfig === undefined) {
Deshui Yu's avatar
Deshui Yu committed
545
546
            throw new Error('trial config is not initialized');
        }
chicm-ms's avatar
chicm-ms committed
547
        const cudaVisibleDevice: string = rmScheduleInfo.cudaVisibleDevice;
548
        const executor = await this.getExecutor(trialJobId);
549
550
551
552
553
        const trialJobDetail: RemoteMachineTrialJobDetail | undefined = this.trialJobsMap.get(trialJobId);
        if (trialJobDetail === undefined) {
            throw new Error(`Can not get trial job detail for job: ${trialJobId}`);
        }

Deshui Yu's avatar
Deshui Yu committed
554
555
        const trialLocalTempFolder: string = path.join(this.expRootDir, 'trials-local', trialJobId);

556
        await executor.createFolder(executor.joinPath(trialJobDetail.workingDirectory, '.nni'));
Deshui Yu's avatar
Deshui Yu committed
557
558
559

        // RemoteMachineRunShellFormat is the run shell format string,
        // See definition in remoteMachineData.ts
SparkSnail's avatar
SparkSnail committed
560

561
        let cudaVisible: string;
chicm-ms's avatar
chicm-ms committed
562
563
        // Set CUDA_VISIBLE_DEVICES environment variable based on cudaVisibleDevice
        // If no valid cudaVisibleDevice is defined, set CUDA_VISIBLE_DEVICES to empty string to hide GPU device
SparkSnail's avatar
SparkSnail committed
564
565
        // If gpuNum is undefined, will not set CUDA_VISIBLE_DEVICES in script
        if (this.trialConfig.gpuNum === undefined) {
566
            cudaVisible = ""
SparkSnail's avatar
SparkSnail committed
567
        } else {
chicm-ms's avatar
chicm-ms committed
568
            if (typeof cudaVisibleDevice === 'string' && cudaVisibleDevice.length > 0) {
569
                cudaVisible = `CUDA_VISIBLE_DEVICES=${cudaVisibleDevice}`;
SparkSnail's avatar
SparkSnail committed
570
            } else {
571
                cudaVisible = `CUDA_VISIBLE_DEVICES=" "`;
SparkSnail's avatar
SparkSnail committed
572
            }
SparkSnail's avatar
SparkSnail committed
573
        }
574
575
        const nniManagerIp: string = this.nniManagerIpConfig ? this.nniManagerIpConfig.nniManagerIp : getIPV4Address();
        if (this.remoteRestServerPort === undefined) {
SparkSnail's avatar
SparkSnail committed
576
577
578
            const restServer: RemoteMachineJobRestServer = component.get(RemoteMachineJobRestServer);
            this.remoteRestServerPort = restServer.clusterRestServerPort;
        }
579
        const version: string = this.versionCheck ? await getVersion() : '';
580
581
        const runScriptTrialContent: string = executor.generateStartScript(
            trialJobDetail.workingDirectory,
Deshui Yu's avatar
Deshui Yu committed
582
            trialJobId,
SparkSnail's avatar
SparkSnail committed
583
            getExperimentId(),
584
            trialJobDetail.form.sequenceId.toString(),
585
            this.isMultiPhase,
586
            this.trialConfig.command,
SparkSnail's avatar
SparkSnail committed
587
588
            nniManagerIp,
            this.remoteRestServerPort,
589
            version,
590
            this.logCollection, cudaVisible);
Deshui Yu's avatar
Deshui Yu committed
591
592

        //create tmp trial working folder locally.
593
        await execMkdir(path.join(trialLocalTempFolder, '.nni'));
594
595
596

        // Write install_nni.sh, it's not used in Windows platform.
        await fs.promises.writeFile(path.join(trialLocalTempFolder, executor.getScriptName("install_nni")), CONTAINER_INSTALL_NNI_SHELL_FORMAT, { encoding: 'utf8' });
597
        // Write file content ( run.sh and parameter.cfg ) to local tmp files
598
        await fs.promises.writeFile(path.join(trialLocalTempFolder, executor.getScriptName("run")), runScriptTrialContent, { encoding: 'utf8' });
chicm-ms's avatar
chicm-ms committed
599
        await this.writeParameterFile(trialJobId, form.hyperParameters);
Deshui Yu's avatar
Deshui Yu committed
600
        // Copy files in codeDir to remote working directory
601
        await executor.copyDirectoryToRemote(trialLocalTempFolder, trialJobDetail.workingDirectory);
Deshui Yu's avatar
Deshui Yu committed
602
        // Execute command in remote machine
603
        executor.executeScript(executor.joinPath(trialJobDetail.workingDirectory, executor.getScriptName("run")), true, true);
Deshui Yu's avatar
Deshui Yu committed
604
605
    }

606
    private async updateTrialJobStatus(trialJob: RemoteMachineTrialJobDetail, executor: ShellExecutor): Promise<TrialJobDetail> {
Deshui Yu's avatar
Deshui Yu committed
607
        const deferred: Deferred<TrialJobDetail> = new Deferred<TrialJobDetail>();
608
609
        const jobpidPath: string = this.getJobPidPath(executor, trialJob.id);
        const trialReturnCodeFilePath: string = executor.joinPath(executor.getRemoteExperimentRootDir(getExperimentId()), 'trials', trialJob.id, '.nni', 'code');
610
        /* eslint-disable require-atomic-updates */
Deshui Yu's avatar
Deshui Yu committed
611
        try {
612
            const isAlive = await executor.isProcessAlive(jobpidPath);
Deshui Yu's avatar
Deshui Yu committed
613
            // if the process of jobpid is not alive any more
614
615
616
617
            if (!isAlive) {
                const trialReturnCode: string = await executor.getRemoteFileContent(trialReturnCodeFilePath);
                this.log.debug(`trailjob ${trialJob.id} return code: ${trialReturnCode}`);
                const match: RegExpMatchArray | null = trialReturnCode.trim()
618
                    .match(/^-?(\d+)\s+(\d+)$/);
619
                if (match !== null) {
Deshui Yu's avatar
Deshui Yu committed
620
621
622
623
624
                    const { 1: code, 2: timestamp } = match;
                    // Update trial job's status based on result code
                    if (parseInt(code, 10) === 0) {
                        trialJob.status = 'SUCCEEDED';
                    } else {
625
626
627
628
629
630
                        // isEarlyStopped is never set, mean it's not cancelled by NNI, so if the process's exit code >0, mark it as FAILED
                        if (trialJob.isEarlyStopped === undefined) {
                            trialJob.status = 'FAILED';
                        } else {
                            trialJob.status = getJobCancelStatus(trialJob.isEarlyStopped);
                        }
Deshui Yu's avatar
Deshui Yu committed
631
                    }
632
                    trialJob.endTime = parseInt(timestamp, 10);
633
                    this.releaseTrialResource(trialJob);
Deshui Yu's avatar
Deshui Yu committed
634
                }
chicm-ms's avatar
chicm-ms committed
635
                this.log.debug(`trailJob status update: ${trialJob.id}, ${trialJob.status}`);
Deshui Yu's avatar
Deshui Yu committed
636
637
638
            }
            deferred.resolve(trialJob);
        } catch (error) {
639
            this.log.debug(`(Ignorable mostly)Update job status exception, error is ${error.message}`);
Deshui Yu's avatar
Deshui Yu committed
640
641
642
643
644
645
646
            if (error instanceof NNIError && error.name === NNIErrorNames.NOT_FOUND) {
                deferred.resolve(trialJob);
            } else {
                trialJob.status = 'UNKNOWN';
                deferred.resolve(trialJob);
            }
        }
647
        /* eslint-enable require-atomic-updates */
Deshui Yu's avatar
Deshui Yu committed
648
649
650
        return deferred.promise;
    }

chicm-ms's avatar
chicm-ms committed
651
    public get MetricsEmitter(): EventEmitter {
SparkSnail's avatar
SparkSnail committed
652
653
654
        return this.metricsEmitter;
    }

655
    private getJobPidPath(executor: ShellExecutor, jobId: string): string {
Deshui Yu's avatar
Deshui Yu committed
656
657
658
659
660
        const trialJobDetail: RemoteMachineTrialJobDetail | undefined = this.trialJobsMap.get(jobId);
        if (trialJobDetail === undefined) {
            throw new NNIError(NNIErrorNames.INVALID_JOB_DETAIL, `Invalid job detail information for trial job ${jobId}`);
        }

661
        return executor.joinPath(trialJobDetail.workingDirectory, '.nni', 'jobpid');
Deshui Yu's avatar
Deshui Yu committed
662
    }
chicm-ms's avatar
chicm-ms committed
663

chicm-ms's avatar
chicm-ms committed
664
    private async writeParameterFile(trialJobId: string, hyperParameters: HyperParameters): Promise<void> {
665
        const executor = await this.getExecutor(trialJobId);
chicm-ms's avatar
chicm-ms committed
666

667
        const trialWorkingFolder: string = executor.joinPath(executor.getRemoteExperimentRootDir(getExperimentId()), 'trials', trialJobId);
chicm-ms's avatar
chicm-ms committed
668
669
        const trialLocalTempFolder: string = path.join(this.expRootDir, 'trials-local', trialJobId);

670
        const fileName: string = generateParamFileName(hyperParameters);
chicm-ms's avatar
chicm-ms committed
671
672
673
        const localFilepath: string = path.join(trialLocalTempFolder, fileName);
        await fs.promises.writeFile(localFilepath, hyperParameters.value, { encoding: 'utf8' });

674
        await executor.copyFileToRemote(localFilepath, executor.joinPath(trialWorkingFolder, fileName));
chicm-ms's avatar
chicm-ms committed
675
    }
Deshui Yu's avatar
Deshui Yu committed
676
677
678
}

export { RemoteMachineTrainingService };