remoteMachineTrainingService.ts 30.2 KB
Newer Older
liuzhe-lz's avatar
liuzhe-lz committed
1
2
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.
Deshui Yu's avatar
Deshui Yu committed
3
4
5

'use strict';

6
import * as assert from 'assert';
Deshui Yu's avatar
Deshui Yu committed
7
8
9
import { EventEmitter } from 'events';
import * as fs from 'fs';
import * as path from 'path';
10
import { ShellExecutor } from 'training_service/remote_machine/shellExecutor';
Deshui Yu's avatar
Deshui Yu committed
11
12
import { Deferred } from 'ts-deferred';
import * as component from '../../common/component';
13
import { NNIError, NNIErrorNames, MethodNotImplementedError } from '../../common/errors';
14
import { getExperimentId } from '../../common/experimentStartupInfo';
Deshui Yu's avatar
Deshui Yu committed
15
16
17
import { getLogger, Logger } from '../../common/log';
import { ObservableTimer } from '../../common/observableTimer';
import {
18
    HyperParameters, NNIManagerIpConfig, TrainingService, TrialJobApplicationForm,
19
    TrialJobDetail, TrialJobMetric, LogType
Deshui Yu's avatar
Deshui Yu committed
20
} from '../../common/trainingService';
21
import {
22
23
    delay, generateParamFileName, getExperimentRootDir, getIPV4Address, getJobCancelStatus,
    getVersion, uniqueString
24
25
} from '../../common/utils';
import { CONTAINER_INSTALL_NNI_SHELL_FORMAT } from '../common/containerJobData';
26
import { GPUSummary, ScheduleResultType } from '../common/gpuData';
27
28
import { TrialConfig } from '../common/trialConfig';
import { TrialConfigMetadataKey } from '../common/trialConfigMetadataKey';
29
import { execMkdir, validateCodeDir } from '../common/util';
Deshui Yu's avatar
Deshui Yu committed
30
31
import { GPUScheduler } from './gpuScheduler';
import {
32
33
    ExecutorManager, RemoteMachineMeta,
    RemoteMachineScheduleInfo, RemoteMachineScheduleResult, RemoteMachineTrialJobDetail
Deshui Yu's avatar
Deshui Yu committed
34
} from './remoteMachineData';
SparkSnail's avatar
SparkSnail committed
35
import { RemoteMachineJobRestServer } from './remoteMachineJobRestServer';
Deshui Yu's avatar
Deshui Yu committed
36
37
38
39

/**
 * Training Service implementation for Remote Machine (Linux)
 */
SparkSnail's avatar
SparkSnail committed
40
@component.Singleton
Deshui Yu's avatar
Deshui Yu committed
41
class RemoteMachineTrainingService implements TrainingService {
42
    private readonly initExecutorId = "initConnection";
43
    private readonly machineExecutorManagerMap: Map<RemoteMachineMeta, ExecutorManager>; //machine excutor map
44
    private readonly machineCopyExpCodeDirPromiseMap: Map<RemoteMachineMeta, Promise<void>>;
45
    private readonly trialExecutorManagerMap: Map<string, ExecutorManager>; //trial excutor map
46
47
    private readonly trialJobsMap: Map<string, RemoteMachineTrialJobDetail>;
    private readonly expRootDir: string;
48
    private trialConfig: TrialConfig | undefined;
49
    private gpuScheduler?: GPUScheduler;
50
51
    private readonly jobQueue: string[];
    private readonly timer: ObservableTimer;
Deshui Yu's avatar
Deshui Yu committed
52
    private stopping: boolean = false;
53
54
    private readonly metricsEmitter: EventEmitter;
    private readonly log: Logger;
55
    private isMultiPhase: boolean = false;
SparkSnail's avatar
SparkSnail committed
56
57
    private remoteRestServerPort?: number;
    private nniManagerIpConfig?: NNIManagerIpConfig;
58
    private versionCheck: boolean = true;
SparkSnail's avatar
SparkSnail committed
59
    private logCollection: string;
60
    private sshConnectionPromises: any[];
Deshui Yu's avatar
Deshui Yu committed
61
62
63
64

    constructor(@component.Inject timer: ObservableTimer) {
        this.metricsEmitter = new EventEmitter();
        this.trialJobsMap = new Map<string, RemoteMachineTrialJobDetail>();
65
        this.trialExecutorManagerMap = new Map<string, ExecutorManager>();
66
        this.machineCopyExpCodeDirPromiseMap = new Map<RemoteMachineMeta, Promise<void>>();
67
        this.machineExecutorManagerMap = new Map<RemoteMachineMeta, ExecutorManager>();
Deshui Yu's avatar
Deshui Yu committed
68
        this.jobQueue = [];
69
        this.sshConnectionPromises = [];
Deshui Yu's avatar
Deshui Yu committed
70
71
72
        this.expRootDir = getExperimentRootDir();
        this.timer = timer;
        this.log = getLogger();
SparkSnail's avatar
SparkSnail committed
73
        this.logCollection = 'none';
chicm-ms's avatar
chicm-ms committed
74
        this.log.info('Construct remote machine training service.');
Deshui Yu's avatar
Deshui Yu committed
75
76
77
78
79
80
    }

    /**
     * Loop to launch trial jobs and collect trial metrics
     */
    public async run(): Promise<void> {
SparkSnail's avatar
SparkSnail committed
81
82
        const restServer: RemoteMachineJobRestServer = component.get(RemoteMachineJobRestServer);
        await restServer.start();
83
        restServer.setEnableVersionCheck = this.versionCheck;
chicm-ms's avatar
chicm-ms committed
84
        this.log.info('Run remote machine training service.');
85
86
87
88
89
        if (this.sshConnectionPromises.length > 0) {
            await Promise.all(this.sshConnectionPromises);
            this.log.info('ssh connection initialized!');
            // set sshConnectionPromises to [] to avoid log information duplicated
            this.sshConnectionPromises = [];
90
91
            // initialize gpuScheduler
            this.gpuScheduler = new GPUScheduler(this.machineExecutorManagerMap);
SparkSnail's avatar
SparkSnail committed
92
93
94
95
96
97
98
99
100
101
102
103
104
            if (this.trialConfig ===  undefined) {
                throw new Error("trial config not initialized!");
            }
            // Copy codeDir to remote machine
            for (const [rmMeta, executorManager] of this.machineExecutorManagerMap.entries()) {
                const executor: ShellExecutor = await executorManager.getExecutor(this.initExecutorId);
                if (executor !== undefined) {
                    this.machineCopyExpCodeDirPromiseMap.set(
                        rmMeta,
                        executor.copyDirectoryToRemote(this.trialConfig.codeDir, executor.getRemoteCodePath(getExperimentId()))
                    );
                }
            }
105
        }
Deshui Yu's avatar
Deshui Yu committed
106
107
        while (!this.stopping) {
            while (this.jobQueue.length > 0) {
SparkSnail's avatar
SparkSnail committed
108
                this.updateGpuReservation();
Deshui Yu's avatar
Deshui Yu committed
109
                const trialJobId: string = this.jobQueue[0];
chicm-ms's avatar
chicm-ms committed
110
                const prepareResult: boolean = await this.prepareTrialJob(trialJobId);
Deshui Yu's avatar
Deshui Yu committed
111
112
113
114
                if (prepareResult) {
                    // Remove trial job with trialJobId from job queue
                    this.jobQueue.shift();
                } else {
115
                    // Break the while loop since no GPU resource is available right now,
Deshui Yu's avatar
Deshui Yu committed
116
117
118
                    // Wait to schedule job in next time iteration
                    break;
                }
119
            }
120
            if (restServer.getErrorMessage !== undefined) {
121
                this.stopping = true;
122
                throw new Error(restServer.getErrorMessage);
123
            }
Deshui Yu's avatar
Deshui Yu committed
124
125
            await delay(3000);
        }
126
        this.log.info('RemoteMachineTrainingService run loop exited.');
Deshui Yu's avatar
Deshui Yu committed
127
    }
128

SparkSnail's avatar
SparkSnail committed
129
    /**
130
     * give trial an executor
131
     * @param trial remote machine trial job detail
SparkSnail's avatar
SparkSnail committed
132
     */
133
    public allocateExecutorManagerForTrial(trial: RemoteMachineTrialJobDetail): void {
134
        if (trial.rmMeta === undefined) {
SparkSnail's avatar
SparkSnail committed
135
136
            throw new Error(`rmMeta not set in trial ${trial.id}`);
        }
137
138
139
        const executorManager: ExecutorManager | undefined = this.machineExecutorManagerMap.get(trial.rmMeta);
        if (executorManager === undefined) {
            throw new Error(`executorManager not initialized`);
SparkSnail's avatar
SparkSnail committed
140
        }
141
        this.trialExecutorManagerMap.set(trial.id, executorManager);
SparkSnail's avatar
SparkSnail committed
142
    }
143

SparkSnail's avatar
SparkSnail committed
144
145
    /**
     * If a trial is finished, release the connection resource
146
     * @param trial remote machine trial job detail
SparkSnail's avatar
SparkSnail committed
147
     */
148
    public releaseTrialResource(trial: RemoteMachineTrialJobDetail): void {
149
        if (trial.rmMeta === undefined) {
SparkSnail's avatar
SparkSnail committed
150
151
            throw new Error(`rmMeta not set in trial ${trial.id}`);
        }
152
        const executorManager = this.trialExecutorManagerMap.get(trial.id);
153
        if (executorManager === undefined) {
154
            throw new Error(`ExecutorManager is not assigned for trial ${trial.id}`);
SparkSnail's avatar
SparkSnail committed
155
        }
156
157
        // Note, it still keep reference in trialExecutorManagerMap, as there may be following requests from nni manager.
        executorManager.releaseExecutor(trial.id);
SparkSnail's avatar
SparkSnail committed
158
    }
Deshui Yu's avatar
Deshui Yu committed
159
160
161
162

    /**
     * List submitted trial jobs
     */
163
    public async listTrialJobs(): Promise<TrialJobDetail[]> {
Deshui Yu's avatar
Deshui Yu committed
164
165
166
        const jobs: TrialJobDetail[] = [];
        const deferred: Deferred<TrialJobDetail[]> = new Deferred<TrialJobDetail[]>();

167
        for (const [key,] of this.trialJobsMap) {
168
            jobs.push(await this.getTrialJob(key));
169
        }
Deshui Yu's avatar
Deshui Yu committed
170
171
172
173
174
175
176
177
178
179
180
        deferred.resolve(jobs);

        return deferred.promise;
    }

    /**
     * Get trial job detail information
     * @param trialJobId ID of trial job
     */
    public async getTrialJob(trialJobId: string): Promise<TrialJobDetail> {
        const trialJob: RemoteMachineTrialJobDetail | undefined = this.trialJobsMap.get(trialJobId);
181
        if (trialJob === undefined) {
Deshui Yu's avatar
Deshui Yu committed
182
183
184
185
            throw new NNIError(NNIErrorNames.NOT_FOUND, `trial job id ${trialJobId} not found`);
        }
        //TO DO: add another job status, and design new job status change logic
        if (trialJob.status === 'RUNNING' || trialJob.status === 'UNKNOWN') {
186
            // Get executor where the job is running
Deshui Yu's avatar
Deshui Yu committed
187
188
189
            if (trialJob.rmMeta === undefined) {
                throw new Error(`rmMeta not set for submitted job ${trialJobId}`);
            }
190
            const executor = await this.getExecutor(trialJob.id);
Deshui Yu's avatar
Deshui Yu committed
191

192
            return this.updateTrialJobStatus(trialJob, executor);
Deshui Yu's avatar
Deshui Yu committed
193
194
195
196
197
        } else {
            return trialJob;
        }
    }

198
199
200
201
202
203
204
205
206
    /**
     * Get trial job log
     * @param _trialJobId ID of trial job
     * @param _logType 'TRIAL_LOG' | 'TRIAL_STDERR'
     */
    public async getTrialLog(_trialJobId: string, _logType: LogType): Promise<string> {
        throw new MethodNotImplementedError();
    }

Deshui Yu's avatar
Deshui Yu committed
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
    /**
     * Add job metrics listener
     * @param listener callback listener
     */
    public addTrialJobMetricListener(listener: (metric: TrialJobMetric) => void): void {
        this.metricsEmitter.on('metric', listener);
    }

    /**
     * Remove job metrics listener
     * @param listener callback listener
     */
    public removeTrialJobMetricListener(listener: (metric: TrialJobMetric) => void): void {
        this.metricsEmitter.off('metric', listener);
    }

    /**
     * Submit trial job
     * @param form trial job description form
     */
227
    public async submitTrialJob(form: TrialJobApplicationForm): Promise<TrialJobDetail> {
228
        if (this.trialConfig === undefined) {
Deshui Yu's avatar
Deshui Yu committed
229
230
231
            throw new Error('trial config is not initialized');
        }

232
233
        // Generate trial job id(random)
        const trialJobId: string = uniqueString(5);
Deshui Yu's avatar
Deshui Yu committed
234

235
236
237
238
        const trialJobDetail: RemoteMachineTrialJobDetail = new RemoteMachineTrialJobDetail(
            trialJobId,
            'WAITING',
            Date.now(),
239
            "unset",
240
241
242
243
            form
        );
        this.jobQueue.push(trialJobId);
        this.trialJobsMap.set(trialJobId, trialJobDetail);
244

245
        return Promise.resolve(trialJobDetail);
Deshui Yu's avatar
Deshui Yu committed
246
247
    }

248
249
250
251
252
    /**
     * Update trial job for multi-phase
     * @param trialJobId trial job id
     * @param form job application form
     */
253
    public async updateTrialJob(trialJobId: string, form: TrialJobApplicationForm): Promise<TrialJobDetail> {
chicm-ms's avatar
chicm-ms committed
254
255
256
257
        const trialJobDetail: undefined | TrialJobDetail = this.trialJobsMap.get(trialJobId);
        if (trialJobDetail === undefined) {
            throw new Error(`updateTrialJob failed: ${trialJobId} not found`);
        }
chicm-ms's avatar
chicm-ms committed
258
        await this.writeParameterFile(trialJobId, form.hyperParameters);
chicm-ms's avatar
chicm-ms committed
259
260

        return trialJobDetail;
261
    }
262

263
264
265
266
    /**
     * Is multiphase job supported in current training service
     */
    public get isMultiPhaseJobSupported(): boolean {
267
        return true;
268
269
    }

Deshui Yu's avatar
Deshui Yu committed
270
271
272
273
    /**
     * Cancel trial job
     * @param trialJobId ID of trial job
     */
QuanluZhang's avatar
QuanluZhang committed
274
    public async cancelTrialJob(trialJobId: string, isEarlyStopped: boolean = false): Promise<void> {
Deshui Yu's avatar
Deshui Yu committed
275
        const trialJob: RemoteMachineTrialJobDetail | undefined = this.trialJobsMap.get(trialJobId);
276
        if (trialJob === undefined) {
Deshui Yu's avatar
Deshui Yu committed
277
278
279
280
            throw new Error(`trial job id ${trialJobId} not found`);
        }

        // Remove the job with trialJobId from job queue
chicm-ms's avatar
chicm-ms committed
281
        const index: number = this.jobQueue.indexOf(trialJobId);
282
        if (index >= 0) {
Deshui Yu's avatar
Deshui Yu committed
283
284
285
            this.jobQueue.splice(index, 1);
        }

286
        // Get executor where the job is running
Deshui Yu's avatar
Deshui Yu committed
287
288
        if (trialJob.rmMeta !== undefined) {
            // If the trial job is already scheduled, check its status and kill the trial process in remote machine
289
            const executor = await this.getExecutor(trialJob.id);
Deshui Yu's avatar
Deshui Yu committed
290

chicm-ms's avatar
chicm-ms committed
291
292
            if (trialJob.status === 'UNKNOWN') {
                trialJob.status = 'USER_CANCELED';
293
                this.releaseTrialResource(trialJob);
chicm-ms's avatar
chicm-ms committed
294
295
296
                return
            }

297
            const jobpidPath: string = this.getJobPidPath(executor, trialJob.id);
Deshui Yu's avatar
Deshui Yu committed
298
            try {
299
300
                // Mark the toEarlyStop tag here
                trialJob.isEarlyStopped = isEarlyStopped;
301
                await executor.killChildProcesses(jobpidPath);
302
                this.releaseTrialResource(trialJob);
Deshui Yu's avatar
Deshui Yu committed
303
304
            } catch (error) {
                // Not handle the error since pkill failed will not impact trial job's current status
305
                this.log.error(`remoteTrainingService.cancelTrialJob: ${error}`);
Deshui Yu's avatar
Deshui Yu committed
306
307
308
            }
        } else {
            // Job is not scheduled yet, set status to 'USER_CANCELLED' directly
QuanluZhang's avatar
QuanluZhang committed
309
310
            assert(isEarlyStopped === false, 'isEarlyStopped is not supposed to be true here.');
            trialJob.status = getJobCancelStatus(isEarlyStopped);
Deshui Yu's avatar
Deshui Yu committed
311
312
313
314
315
316
        }
    }

    /**
     * Set culster metadata
     * @param key metadata key
317
     * //1. MACHINE_LIST -- create executor of machine list
Deshui Yu's avatar
Deshui Yu committed
318
319
320
321
322
     * //2. TRIAL_CONFIG -- trial configuration
     * @param value metadata value
     */
    public async setClusterMetadata(key: string, value: string): Promise<void> {
        switch (key) {
SparkSnail's avatar
SparkSnail committed
323
324
325
            case TrialConfigMetadataKey.NNI_MANAGER_IP:
                this.nniManagerIpConfig = <NNIManagerIpConfig>JSON.parse(value);
                break;
326
            case TrialConfigMetadataKey.MACHINE_LIST:
Deshui Yu's avatar
Deshui Yu committed
327
328
                await this.setupConnections(value);
                break;
chicm-ms's avatar
chicm-ms committed
329
            case TrialConfigMetadataKey.TRIAL_CONFIG: {
330
                const remoteMachineTrailConfig: TrialConfig = <TrialConfig>JSON.parse(value);
Deshui Yu's avatar
Deshui Yu committed
331
                // Parse trial config failed, throw Error
332
                if (remoteMachineTrailConfig === undefined) {
Deshui Yu's avatar
Deshui Yu committed
333
334
335
                    throw new Error('trial config parsed failed');
                }
                // codeDir is not a valid directory, throw Error
336
                if (!fs.lstatSync(remoteMachineTrailConfig.codeDir)
337
                    .isDirectory()) {
Deshui Yu's avatar
Deshui Yu committed
338
339
                    throw new Error(`codeDir ${remoteMachineTrailConfig.codeDir} is not a directory`);
                }
340
341

                try {
342
                    // Validate to make sure codeDir doesn't have too many files
343
                    await validateCodeDir(remoteMachineTrailConfig.codeDir);
344
                } catch (error) {
345
                    this.log.error(error);
346
                    return Promise.reject(new Error(error));
347
348
                }

Deshui Yu's avatar
Deshui Yu committed
349
350
                this.trialConfig = remoteMachineTrailConfig;
                break;
chicm-ms's avatar
chicm-ms committed
351
            }
352
353
354
            case TrialConfigMetadataKey.MULTI_PHASE:
                this.isMultiPhase = (value === 'true' || value === 'True');
                break;
355
356
357
            case TrialConfigMetadataKey.VERSION_CHECK:
                this.versionCheck = (value === 'true' || value === 'True');
                break;
SparkSnail's avatar
SparkSnail committed
358
359
360
            case TrialConfigMetadataKey.LOG_COLLECTION:
                this.logCollection = value;
                break;
Deshui Yu's avatar
Deshui Yu committed
361
362
363
364
365
366
367
368
369
370
            default:
                //Reject for unknown keys
                throw new Error(`Uknown key: ${key}`);
        }
    }

    /**
     * Get culster metadata
     * @param key metadata key
     */
371
    public async getClusterMetadata(_key: string): Promise<string> {
372
        return "";
Deshui Yu's avatar
Deshui Yu committed
373
    }
374

SparkSnail's avatar
SparkSnail committed
375
    /**
376
     * cleanup() has a time out of 10s to clean remote connections
SparkSnail's avatar
SparkSnail committed
377
378
     */
    public async cleanUp(): Promise<void> {
chicm-ms's avatar
chicm-ms committed
379
        this.log.info('Stopping remote machine training service...');
Deshui Yu's avatar
Deshui Yu committed
380
        this.stopping = true;
381
382
383
384
385
386
387
388
389
        await this.cleanupConnections();
    }

    private async getExecutor(trialId: string): Promise<ShellExecutor> {
        const executorManager = this.trialExecutorManagerMap.get(trialId);
        if (executorManager === undefined) {
            throw new Error(`ExecutorManager is not assigned for trial ${trialId}`);
        }
        return await executorManager.getExecutor(trialId);
SparkSnail's avatar
SparkSnail committed
390
    }
391

392
393
394
395
    /**
     * remove gpu reversion when job is not running
     */
    private updateGpuReservation(): void {
396
397
398
399
400
        if (this.gpuScheduler) {
            for (const [key, value] of this.trialJobsMap) {
                if (!['WAITING', 'RUNNING'].includes(value.status)) {
                    this.gpuScheduler.removeGpuReservation(key, this.trialJobsMap);
                }
401
402
403
404
            }
        }
    }

SparkSnail's avatar
SparkSnail committed
405
406
407
408
    /**
     * stop gpu_metric_collector process in remote machine and remove unused scripts
     */
    private async cleanupConnections(): Promise<void> {
409
        try {
410
411
            for (const executorManager of this.machineExecutorManagerMap.values()) {
                const executor = await executorManager.getExecutor(this.initExecutorId);
412
                if (executor !== undefined) {
413
414
                    this.log.info(`killing gpu metric collector on ${executor.name}`);
                    const gpuJobPidPath: string = executor.joinPath(executor.getRemoteScriptsPath(getExperimentId()), 'pid');
415
                    await executor.killChildProcesses(gpuJobPidPath, true);
SparkSnail's avatar
SparkSnail committed
416
                }
417
                executorManager.releaseAllExecutor();
SparkSnail's avatar
SparkSnail committed
418
            }
419
        } catch (error) {
SparkSnail's avatar
SparkSnail committed
420
            //ignore error, this function is called to cleanup remote connections when experiment is stopping
421
            this.log.error(`Cleanup connection exception, error is ${error}`);
SparkSnail's avatar
SparkSnail committed
422
        }
423
424
    }

Deshui Yu's avatar
Deshui Yu committed
425
    private async setupConnections(machineList: string): Promise<void> {
chicm-ms's avatar
chicm-ms committed
426
        this.log.debug(`Connecting to remote machines: ${machineList}`);
Deshui Yu's avatar
Deshui Yu committed
427
428
        //TO DO: verify if value's format is wrong, and json parse failed, how to handle error
        const rmMetaList: RemoteMachineMeta[] = <RemoteMachineMeta[]>JSON.parse(machineList);
SparkSnail's avatar
SparkSnail committed
429

430
        for (const rmMeta of rmMetaList) {
431
            this.sshConnectionPromises.push(this.initRemoteMachineOnConnected(rmMeta));
432
        }
Deshui Yu's avatar
Deshui Yu committed
433
434
    }

435
436
437
438
439
440
441
442
443
    private async initRemoteMachineOnConnected(rmMeta: RemoteMachineMeta): Promise<void> {
        rmMeta.occupiedGpuIndexMap = new Map<number, number>();
        const executorManager: ExecutorManager = new ExecutorManager(rmMeta);
        this.log.info(`connecting to ${rmMeta.username}@${rmMeta.ip}:${rmMeta.port}`);
        const executor: ShellExecutor = await executorManager.getExecutor(this.initExecutorId);
        this.log.debug(`reached ${executor.name}`);
        this.machineExecutorManagerMap.set(rmMeta, executorManager);
        this.log.debug(`initializing ${executor.name}`);

444
        // Create root working directory after executor is ready
445
446
        const nniRootDir: string = executor.joinPath(executor.getTempPath(), 'nni');
        await executor.createFolder(executor.getRemoteExperimentRootDir(getExperimentId()));
447

448
        // the directory to store temp scripts in remote machine
449
450
451
        const remoteGpuScriptCollectorDir: string = executor.getRemoteScriptsPath(getExperimentId());

        // clean up previous result.
452
        await executor.createFolder(remoteGpuScriptCollectorDir, true);
Junwei Sun's avatar
Junwei Sun committed
453
        await executor.allowPermission(true, nniRootDir);
454

Deshui Yu's avatar
Deshui Yu committed
455
        //Begin to execute gpu_metrics_collection scripts
456
        const script = executor.generateGpuStatsScript(getExperimentId());
457
        executor.executeScript(script, false, true);
458
459
460
        // the timer is trigger in 1 second, it causes multiple runs on server.
        // So reduce it's freqeunce, only allow one of it run.
        const collectingCount: boolean[] = [];
461

462
        const disposable: Rx.IDisposable = this.timer.subscribe(
463
            async () => {
464
465
466
467
468
469
470
471
472
                if (collectingCount.length == 0) {
                    collectingCount.push(true);
                    const cmdresult = await executor.readLastLines(executor.joinPath(remoteGpuScriptCollectorDir, 'gpu_metrics'));
                    if (cmdresult !== "") {
                        rmMeta.gpuSummary = <GPUSummary>JSON.parse(cmdresult);
                        if (rmMeta.gpuSummary.gpuCount === 0) {
                            this.log.warning(`No GPU found on remote machine ${rmMeta.ip}`);
                            this.timer.unsubscribe(disposable);
                        }
473
                    }
474
                    if (this.stopping) {
475
476
477
                        this.timer.unsubscribe(disposable);
                        this.log.debug(`Stopped GPU collector on ${rmMeta.ip}, since experiment is exiting.`);
                    }
478
                    collectingCount.pop();
Deshui Yu's avatar
Deshui Yu committed
479
480
481
482
483
484
                }
            }
        );
    }

    private async prepareTrialJob(trialJobId: string): Promise<boolean> {
chicm-ms's avatar
chicm-ms committed
485
        const deferred: Deferred<boolean> = new Deferred<boolean>();
Deshui Yu's avatar
Deshui Yu committed
486

487
        if (this.trialConfig === undefined) {
Deshui Yu's avatar
Deshui Yu committed
488
489
            throw new Error('trial config is not initialized');
        }
490
491
492
        if (this.gpuScheduler === undefined) {
            throw new Error('gpuScheduler is not initialized');
        }
Deshui Yu's avatar
Deshui Yu committed
493
494
495
496
        const trialJobDetail: RemoteMachineTrialJobDetail | undefined = this.trialJobsMap.get(trialJobId);
        if (trialJobDetail === undefined) {
            throw new NNIError(NNIErrorNames.INVALID_JOB_DETAIL, `Invalid job detail information for trial job ${trialJobId}`);
        }
497
498
499
        // If job is not WATIING, Don't prepare and resolve true immediately
        if (trialJobDetail.status !== 'WAITING') {
            deferred.resolve(true);
500

501
502
            return deferred.promise;
        }
503
        // get an executor from scheduler
504
        const rmScheduleResult: RemoteMachineScheduleResult = this.gpuScheduler.scheduleMachine(this.trialConfig.gpuNum, trialJobDetail);
Deshui Yu's avatar
Deshui Yu committed
505
        if (rmScheduleResult.resultType === ScheduleResultType.REQUIRE_EXCEED_TOTAL) {
chicm-ms's avatar
chicm-ms committed
506
            const errorMessage: string = `Required GPU number ${this.trialConfig.gpuNum} is too large, no machine can meet`;
Deshui Yu's avatar
Deshui Yu committed
507
508
509
            this.log.error(errorMessage);
            deferred.reject();
            throw new NNIError(NNIErrorNames.RESOURCE_NOT_AVAILABLE, errorMessage);
510
        } else if (rmScheduleResult.resultType === ScheduleResultType.SUCCEED
Deshui Yu's avatar
Deshui Yu committed
511
            && rmScheduleResult.scheduleInfo !== undefined) {
chicm-ms's avatar
chicm-ms committed
512
            const rmScheduleInfo: RemoteMachineScheduleInfo = rmScheduleResult.scheduleInfo;
SparkSnail's avatar
SparkSnail committed
513
514

            trialJobDetail.rmMeta = rmScheduleInfo.rmMeta;
515
516
517
518
            const copyExpCodeDirPromise = this.machineCopyExpCodeDirPromiseMap.get(trialJobDetail.rmMeta);
            if (copyExpCodeDirPromise !== undefined) {
                await copyExpCodeDirPromise;
            }
SparkSnail's avatar
SparkSnail committed
519

520
521
522
523
524
            this.allocateExecutorManagerForTrial(trialJobDetail);
            const executor = await this.getExecutor(trialJobDetail.id);

            trialJobDetail.workingDirectory = executor.joinPath(executor.getRemoteExperimentRootDir(getExperimentId()), 'trials', trialJobDetail.id);

Deshui Yu's avatar
Deshui Yu committed
525
            await this.launchTrialOnScheduledMachine(
526
                trialJobId, trialJobDetail.form, rmScheduleInfo);
Deshui Yu's avatar
Deshui Yu committed
527
528

            trialJobDetail.status = 'RUNNING';
529
            trialJobDetail.url = `file://${rmScheduleInfo.rmMeta.ip}:${trialJobDetail.workingDirectory}`;
530
            trialJobDetail.startTime = Date.now();
Deshui Yu's avatar
Deshui Yu committed
531

532
            this.trialJobsMap.set(trialJobId, trialJobDetail);
Deshui Yu's avatar
Deshui Yu committed
533
            deferred.resolve(true);
534
        } else if (rmScheduleResult.resultType === ScheduleResultType.TMP_NO_AVAILABLE_GPU) {
Deshui Yu's avatar
Deshui Yu committed
535
536
537
            this.log.info(`Right now no available GPU can be allocated for trial ${trialJobId}, will try to schedule later`);
            deferred.resolve(false);
        } else {
538
            deferred.reject(`Invalid schedule resutl type: ${rmScheduleResult.resultType}`);
Deshui Yu's avatar
Deshui Yu committed
539
540
541
542
543
        }

        return deferred.promise;
    }

544
    private async launchTrialOnScheduledMachine(trialJobId: string, form: TrialJobApplicationForm,
545
        rmScheduleInfo: RemoteMachineScheduleInfo): Promise<void> {
546
        if (this.trialConfig === undefined) {
Deshui Yu's avatar
Deshui Yu committed
547
548
            throw new Error('trial config is not initialized');
        }
chicm-ms's avatar
chicm-ms committed
549
        const cudaVisibleDevice: string = rmScheduleInfo.cudaVisibleDevice;
550
        const executor = await this.getExecutor(trialJobId);
551
552
553
554
555
        const trialJobDetail: RemoteMachineTrialJobDetail | undefined = this.trialJobsMap.get(trialJobId);
        if (trialJobDetail === undefined) {
            throw new Error(`Can not get trial job detail for job: ${trialJobId}`);
        }

Deshui Yu's avatar
Deshui Yu committed
556
557
        const trialLocalTempFolder: string = path.join(this.expRootDir, 'trials-local', trialJobId);

558
        await executor.createFolder(executor.joinPath(trialJobDetail.workingDirectory, '.nni'));
Deshui Yu's avatar
Deshui Yu committed
559
560
561

        // RemoteMachineRunShellFormat is the run shell format string,
        // See definition in remoteMachineData.ts
SparkSnail's avatar
SparkSnail committed
562

563
        let cudaVisible: string;
chicm-ms's avatar
chicm-ms committed
564
565
        // Set CUDA_VISIBLE_DEVICES environment variable based on cudaVisibleDevice
        // If no valid cudaVisibleDevice is defined, set CUDA_VISIBLE_DEVICES to empty string to hide GPU device
SparkSnail's avatar
SparkSnail committed
566
567
        // If gpuNum is undefined, will not set CUDA_VISIBLE_DEVICES in script
        if (this.trialConfig.gpuNum === undefined) {
568
            cudaVisible = ""
SparkSnail's avatar
SparkSnail committed
569
        } else {
chicm-ms's avatar
chicm-ms committed
570
            if (typeof cudaVisibleDevice === 'string' && cudaVisibleDevice.length > 0) {
571
                cudaVisible = `CUDA_VISIBLE_DEVICES=${cudaVisibleDevice}`;
SparkSnail's avatar
SparkSnail committed
572
            } else {
573
                cudaVisible = `CUDA_VISIBLE_DEVICES=" "`;
SparkSnail's avatar
SparkSnail committed
574
            }
SparkSnail's avatar
SparkSnail committed
575
        }
576
577
        const nniManagerIp: string = this.nniManagerIpConfig ? this.nniManagerIpConfig.nniManagerIp : getIPV4Address();
        if (this.remoteRestServerPort === undefined) {
SparkSnail's avatar
SparkSnail committed
578
579
580
            const restServer: RemoteMachineJobRestServer = component.get(RemoteMachineJobRestServer);
            this.remoteRestServerPort = restServer.clusterRestServerPort;
        }
581
        const version: string = this.versionCheck ? await getVersion() : '';
582
583
        const runScriptTrialContent: string = executor.generateStartScript(
            trialJobDetail.workingDirectory,
Deshui Yu's avatar
Deshui Yu committed
584
            trialJobId,
SparkSnail's avatar
SparkSnail committed
585
            getExperimentId(),
586
            trialJobDetail.form.sequenceId.toString(),
587
            this.isMultiPhase,
588
            this.trialConfig.command,
SparkSnail's avatar
SparkSnail committed
589
590
            nniManagerIp,
            this.remoteRestServerPort,
591
            version,
592
            this.logCollection, cudaVisible);
Deshui Yu's avatar
Deshui Yu committed
593
594

        //create tmp trial working folder locally.
595
        await execMkdir(path.join(trialLocalTempFolder, '.nni'));
596
597
598

        // Write install_nni.sh, it's not used in Windows platform.
        await fs.promises.writeFile(path.join(trialLocalTempFolder, executor.getScriptName("install_nni")), CONTAINER_INSTALL_NNI_SHELL_FORMAT, { encoding: 'utf8' });
599
        // Write file content ( run.sh and parameter.cfg ) to local tmp files
600
        await fs.promises.writeFile(path.join(trialLocalTempFolder, executor.getScriptName("run")), runScriptTrialContent, { encoding: 'utf8' });
chicm-ms's avatar
chicm-ms committed
601
        await this.writeParameterFile(trialJobId, form.hyperParameters);
Deshui Yu's avatar
Deshui Yu committed
602
        // Copy files in codeDir to remote working directory
603
        await executor.copyDirectoryToRemote(trialLocalTempFolder, trialJobDetail.workingDirectory);
Deshui Yu's avatar
Deshui Yu committed
604
        // Execute command in remote machine
605
        executor.executeScript(executor.joinPath(trialJobDetail.workingDirectory, executor.getScriptName("run")), true, true);
Deshui Yu's avatar
Deshui Yu committed
606
607
    }

608
    private async updateTrialJobStatus(trialJob: RemoteMachineTrialJobDetail, executor: ShellExecutor): Promise<TrialJobDetail> {
Deshui Yu's avatar
Deshui Yu committed
609
        const deferred: Deferred<TrialJobDetail> = new Deferred<TrialJobDetail>();
610
611
        const jobpidPath: string = this.getJobPidPath(executor, trialJob.id);
        const trialReturnCodeFilePath: string = executor.joinPath(executor.getRemoteExperimentRootDir(getExperimentId()), 'trials', trialJob.id, '.nni', 'code');
612
        /* eslint-disable require-atomic-updates */
Deshui Yu's avatar
Deshui Yu committed
613
        try {
614
            const isAlive = await executor.isProcessAlive(jobpidPath);
Deshui Yu's avatar
Deshui Yu committed
615
            // if the process of jobpid is not alive any more
616
617
618
619
            if (!isAlive) {
                const trialReturnCode: string = await executor.getRemoteFileContent(trialReturnCodeFilePath);
                this.log.debug(`trailjob ${trialJob.id} return code: ${trialReturnCode}`);
                const match: RegExpMatchArray | null = trialReturnCode.trim()
620
                    .match(/^-?(\d+)\s+(\d+)$/);
621
                if (match !== null) {
Deshui Yu's avatar
Deshui Yu committed
622
623
624
625
626
                    const { 1: code, 2: timestamp } = match;
                    // Update trial job's status based on result code
                    if (parseInt(code, 10) === 0) {
                        trialJob.status = 'SUCCEEDED';
                    } else {
627
628
629
630
631
632
                        // isEarlyStopped is never set, mean it's not cancelled by NNI, so if the process's exit code >0, mark it as FAILED
                        if (trialJob.isEarlyStopped === undefined) {
                            trialJob.status = 'FAILED';
                        } else {
                            trialJob.status = getJobCancelStatus(trialJob.isEarlyStopped);
                        }
Deshui Yu's avatar
Deshui Yu committed
633
                    }
634
                    trialJob.endTime = parseInt(timestamp, 10);
635
                    this.releaseTrialResource(trialJob);
Deshui Yu's avatar
Deshui Yu committed
636
                }
chicm-ms's avatar
chicm-ms committed
637
                this.log.debug(`trailJob status update: ${trialJob.id}, ${trialJob.status}`);
Deshui Yu's avatar
Deshui Yu committed
638
639
640
            }
            deferred.resolve(trialJob);
        } catch (error) {
641
            this.log.debug(`(Ignorable mostly)Update job status exception, error is ${error.message}`);
Deshui Yu's avatar
Deshui Yu committed
642
643
644
645
646
647
648
            if (error instanceof NNIError && error.name === NNIErrorNames.NOT_FOUND) {
                deferred.resolve(trialJob);
            } else {
                trialJob.status = 'UNKNOWN';
                deferred.resolve(trialJob);
            }
        }
649
        /* eslint-enable require-atomic-updates */
Deshui Yu's avatar
Deshui Yu committed
650
651
652
        return deferred.promise;
    }

chicm-ms's avatar
chicm-ms committed
653
    public get MetricsEmitter(): EventEmitter {
SparkSnail's avatar
SparkSnail committed
654
655
656
        return this.metricsEmitter;
    }

657
    private getJobPidPath(executor: ShellExecutor, jobId: string): string {
Deshui Yu's avatar
Deshui Yu committed
658
659
660
661
662
        const trialJobDetail: RemoteMachineTrialJobDetail | undefined = this.trialJobsMap.get(jobId);
        if (trialJobDetail === undefined) {
            throw new NNIError(NNIErrorNames.INVALID_JOB_DETAIL, `Invalid job detail information for trial job ${jobId}`);
        }

663
        return executor.joinPath(trialJobDetail.workingDirectory, '.nni', 'jobpid');
Deshui Yu's avatar
Deshui Yu committed
664
    }
chicm-ms's avatar
chicm-ms committed
665

chicm-ms's avatar
chicm-ms committed
666
    private async writeParameterFile(trialJobId: string, hyperParameters: HyperParameters): Promise<void> {
667
        const executor = await this.getExecutor(trialJobId);
chicm-ms's avatar
chicm-ms committed
668

669
        const trialWorkingFolder: string = executor.joinPath(executor.getRemoteExperimentRootDir(getExperimentId()), 'trials', trialJobId);
chicm-ms's avatar
chicm-ms committed
670
671
        const trialLocalTempFolder: string = path.join(this.expRootDir, 'trials-local', trialJobId);

672
        const fileName: string = generateParamFileName(hyperParameters);
chicm-ms's avatar
chicm-ms committed
673
674
675
        const localFilepath: string = path.join(trialLocalTempFolder, fileName);
        await fs.promises.writeFile(localFilepath, hyperParameters.value, { encoding: 'utf8' });

676
        await executor.copyFileToRemote(localFilepath, executor.joinPath(trialWorkingFolder, fileName));
chicm-ms's avatar
chicm-ms committed
677
    }
Deshui Yu's avatar
Deshui Yu committed
678
679
680
}

export { RemoteMachineTrainingService };