remoteMachineTrainingService.ts 30.2 KB
Newer Older
liuzhe-lz's avatar
liuzhe-lz committed
1
2
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.
Deshui Yu's avatar
Deshui Yu committed
3
4
5

'use strict';

6
import * as assert from 'assert';
Deshui Yu's avatar
Deshui Yu committed
7
8
9
import { EventEmitter } from 'events';
import * as fs from 'fs';
import * as path from 'path';
10
import { ShellExecutor } from 'training_service/remote_machine/shellExecutor';
Deshui Yu's avatar
Deshui Yu committed
11
12
import { Deferred } from 'ts-deferred';
import * as component from '../../common/component';
13
import { NNIError, NNIErrorNames, MethodNotImplementedError } from '../../common/errors';
14
import { getExperimentId } from '../../common/experimentStartupInfo';
Deshui Yu's avatar
Deshui Yu committed
15
16
17
import { getLogger, Logger } from '../../common/log';
import { ObservableTimer } from '../../common/observableTimer';
import {
18
    HyperParameters, NNIManagerIpConfig, TrainingService, TrialJobApplicationForm,
19
    TrialJobDetail, TrialJobMetric, LogType
Deshui Yu's avatar
Deshui Yu committed
20
} from '../../common/trainingService';
21
import {
22
23
    delay, generateParamFileName, getExperimentRootDir, getIPV4Address, getJobCancelStatus,
    getVersion, uniqueString
24
25
} from '../../common/utils';
import { CONTAINER_INSTALL_NNI_SHELL_FORMAT } from '../common/containerJobData';
26
import { GPUSummary, ScheduleResultType } from '../common/gpuData';
27
28
import { TrialConfig } from '../common/trialConfig';
import { TrialConfigMetadataKey } from '../common/trialConfigMetadataKey';
29
import { execMkdir, validateCodeDir } from '../common/util';
Deshui Yu's avatar
Deshui Yu committed
30
31
import { GPUScheduler } from './gpuScheduler';
import {
32
33
    ExecutorManager, RemoteMachineMeta,
    RemoteMachineScheduleInfo, RemoteMachineScheduleResult, RemoteMachineTrialJobDetail
Deshui Yu's avatar
Deshui Yu committed
34
} from './remoteMachineData';
SparkSnail's avatar
SparkSnail committed
35
import { RemoteMachineJobRestServer } from './remoteMachineJobRestServer';
Deshui Yu's avatar
Deshui Yu committed
36
37
38
39

/**
 * Training Service implementation for Remote Machine (Linux)
 */
SparkSnail's avatar
SparkSnail committed
40
@component.Singleton
Deshui Yu's avatar
Deshui Yu committed
41
class RemoteMachineTrainingService implements TrainingService {
42
    private readonly initExecutorId = "initConnection";
43
    private readonly machineExecutorManagerMap: Map<RemoteMachineMeta, ExecutorManager>; //machine excutor map
44
    private readonly machineCopyExpCodeDirPromiseMap: Map<RemoteMachineMeta, Promise<void>>;
45
    private readonly trialExecutorManagerMap: Map<string, ExecutorManager>; //trial excutor map
46
47
    private readonly trialJobsMap: Map<string, RemoteMachineTrialJobDetail>;
    private readonly expRootDir: string;
48
    private trialConfig: TrialConfig | undefined;
49
    private gpuScheduler?: GPUScheduler;
50
51
    private readonly jobQueue: string[];
    private readonly timer: ObservableTimer;
Deshui Yu's avatar
Deshui Yu committed
52
    private stopping: boolean = false;
53
54
    private readonly metricsEmitter: EventEmitter;
    private readonly log: Logger;
55
    private isMultiPhase: boolean = false;
SparkSnail's avatar
SparkSnail committed
56
57
    private remoteRestServerPort?: number;
    private nniManagerIpConfig?: NNIManagerIpConfig;
58
    private versionCheck: boolean = true;
SparkSnail's avatar
SparkSnail committed
59
    private logCollection: string;
60
    private sshConnectionPromises: any[];
Deshui Yu's avatar
Deshui Yu committed
61
62
63
64

    constructor(@component.Inject timer: ObservableTimer) {
        this.metricsEmitter = new EventEmitter();
        this.trialJobsMap = new Map<string, RemoteMachineTrialJobDetail>();
65
        this.trialExecutorManagerMap = new Map<string, ExecutorManager>();
66
        this.machineCopyExpCodeDirPromiseMap = new Map<RemoteMachineMeta, Promise<void>>();
67
        this.machineExecutorManagerMap = new Map<RemoteMachineMeta, ExecutorManager>();
Deshui Yu's avatar
Deshui Yu committed
68
        this.jobQueue = [];
69
        this.sshConnectionPromises = [];
Deshui Yu's avatar
Deshui Yu committed
70
71
72
        this.expRootDir = getExperimentRootDir();
        this.timer = timer;
        this.log = getLogger();
SparkSnail's avatar
SparkSnail committed
73
        this.logCollection = 'none';
chicm-ms's avatar
chicm-ms committed
74
        this.log.info('Construct remote machine training service.');
Deshui Yu's avatar
Deshui Yu committed
75
76
77
78
79
80
    }

    /**
     * Loop to launch trial jobs and collect trial metrics
     */
    public async run(): Promise<void> {
SparkSnail's avatar
SparkSnail committed
81
82
        const restServer: RemoteMachineJobRestServer = component.get(RemoteMachineJobRestServer);
        await restServer.start();
83
        restServer.setEnableVersionCheck = this.versionCheck;
chicm-ms's avatar
chicm-ms committed
84
        this.log.info('Run remote machine training service.');
85
86
87
88
89
        if (this.sshConnectionPromises.length > 0) {
            await Promise.all(this.sshConnectionPromises);
            this.log.info('ssh connection initialized!');
            // set sshConnectionPromises to [] to avoid log information duplicated
            this.sshConnectionPromises = [];
90
91
            // initialize gpuScheduler
            this.gpuScheduler = new GPUScheduler(this.machineExecutorManagerMap);
92
        }
Deshui Yu's avatar
Deshui Yu committed
93
94
        while (!this.stopping) {
            while (this.jobQueue.length > 0) {
SparkSnail's avatar
SparkSnail committed
95
                this.updateGpuReservation();
Deshui Yu's avatar
Deshui Yu committed
96
                const trialJobId: string = this.jobQueue[0];
chicm-ms's avatar
chicm-ms committed
97
                const prepareResult: boolean = await this.prepareTrialJob(trialJobId);
Deshui Yu's avatar
Deshui Yu committed
98
99
100
101
                if (prepareResult) {
                    // Remove trial job with trialJobId from job queue
                    this.jobQueue.shift();
                } else {
102
                    // Break the while loop since no GPU resource is available right now,
Deshui Yu's avatar
Deshui Yu committed
103
104
105
                    // Wait to schedule job in next time iteration
                    break;
                }
106
            }
107
            if (restServer.getErrorMessage !== undefined) {
108
                this.stopping = true;
109
                throw new Error(restServer.getErrorMessage);
110
            }
Deshui Yu's avatar
Deshui Yu committed
111
112
            await delay(3000);
        }
113
        this.log.info('RemoteMachineTrainingService run loop exited.');
Deshui Yu's avatar
Deshui Yu committed
114
    }
115

SparkSnail's avatar
SparkSnail committed
116
    /**
117
     * give trial an executor
118
     * @param trial remote machine trial job detail
SparkSnail's avatar
SparkSnail committed
119
     */
120
    public allocateExecutorManagerForTrial(trial: RemoteMachineTrialJobDetail): void {
121
        if (trial.rmMeta === undefined) {
SparkSnail's avatar
SparkSnail committed
122
123
            throw new Error(`rmMeta not set in trial ${trial.id}`);
        }
124
125
126
        const executorManager: ExecutorManager | undefined = this.machineExecutorManagerMap.get(trial.rmMeta);
        if (executorManager === undefined) {
            throw new Error(`executorManager not initialized`);
SparkSnail's avatar
SparkSnail committed
127
        }
128
        this.trialExecutorManagerMap.set(trial.id, executorManager);
SparkSnail's avatar
SparkSnail committed
129
    }
130

SparkSnail's avatar
SparkSnail committed
131
132
    /**
     * If a trial is finished, release the connection resource
133
     * @param trial remote machine trial job detail
SparkSnail's avatar
SparkSnail committed
134
     */
135
    public releaseTrialResource(trial: RemoteMachineTrialJobDetail): void {
136
        if (trial.rmMeta === undefined) {
SparkSnail's avatar
SparkSnail committed
137
138
            throw new Error(`rmMeta not set in trial ${trial.id}`);
        }
139
        const executorManager = this.trialExecutorManagerMap.get(trial.id);
140
        if (executorManager === undefined) {
141
            throw new Error(`ExecutorManager is not assigned for trial ${trial.id}`);
SparkSnail's avatar
SparkSnail committed
142
        }
143
144
        // Note, it still keep reference in trialExecutorManagerMap, as there may be following requests from nni manager.
        executorManager.releaseExecutor(trial.id);
SparkSnail's avatar
SparkSnail committed
145
    }
Deshui Yu's avatar
Deshui Yu committed
146
147
148
149

    /**
     * List submitted trial jobs
     */
150
    public async listTrialJobs(): Promise<TrialJobDetail[]> {
Deshui Yu's avatar
Deshui Yu committed
151
152
153
        const jobs: TrialJobDetail[] = [];
        const deferred: Deferred<TrialJobDetail[]> = new Deferred<TrialJobDetail[]>();

154
        for (const [key,] of this.trialJobsMap) {
155
            jobs.push(await this.getTrialJob(key));
156
        }
Deshui Yu's avatar
Deshui Yu committed
157
158
159
160
161
162
163
164
165
166
167
        deferred.resolve(jobs);

        return deferred.promise;
    }

    /**
     * Get trial job detail information
     * @param trialJobId ID of trial job
     */
    public async getTrialJob(trialJobId: string): Promise<TrialJobDetail> {
        const trialJob: RemoteMachineTrialJobDetail | undefined = this.trialJobsMap.get(trialJobId);
168
        if (trialJob === undefined) {
Deshui Yu's avatar
Deshui Yu committed
169
170
171
172
            throw new NNIError(NNIErrorNames.NOT_FOUND, `trial job id ${trialJobId} not found`);
        }
        //TO DO: add another job status, and design new job status change logic
        if (trialJob.status === 'RUNNING' || trialJob.status === 'UNKNOWN') {
173
            // Get executor where the job is running
Deshui Yu's avatar
Deshui Yu committed
174
175
176
            if (trialJob.rmMeta === undefined) {
                throw new Error(`rmMeta not set for submitted job ${trialJobId}`);
            }
177
            const executor = await this.getExecutor(trialJob.id);
Deshui Yu's avatar
Deshui Yu committed
178

179
            return this.updateTrialJobStatus(trialJob, executor);
Deshui Yu's avatar
Deshui Yu committed
180
181
182
183
184
        } else {
            return trialJob;
        }
    }

185
186
187
188
189
190
191
192
193
    /**
     * Get trial job log
     * @param _trialJobId ID of trial job
     * @param _logType 'TRIAL_LOG' | 'TRIAL_STDERR'
     */
    public async getTrialLog(_trialJobId: string, _logType: LogType): Promise<string> {
        throw new MethodNotImplementedError();
    }

Deshui Yu's avatar
Deshui Yu committed
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
    /**
     * Add job metrics listener
     * @param listener callback listener
     */
    public addTrialJobMetricListener(listener: (metric: TrialJobMetric) => void): void {
        this.metricsEmitter.on('metric', listener);
    }

    /**
     * Remove job metrics listener
     * @param listener callback listener
     */
    public removeTrialJobMetricListener(listener: (metric: TrialJobMetric) => void): void {
        this.metricsEmitter.off('metric', listener);
    }

    /**
     * Submit trial job
     * @param form trial job description form
     */
214
    public async submitTrialJob(form: TrialJobApplicationForm): Promise<TrialJobDetail> {
215
        if (this.trialConfig === undefined) {
Deshui Yu's avatar
Deshui Yu committed
216
217
218
            throw new Error('trial config is not initialized');
        }

219
220
        // Generate trial job id(random)
        const trialJobId: string = uniqueString(5);
Deshui Yu's avatar
Deshui Yu committed
221

222
223
224
225
        const trialJobDetail: RemoteMachineTrialJobDetail = new RemoteMachineTrialJobDetail(
            trialJobId,
            'WAITING',
            Date.now(),
226
            "unset",
227
228
229
230
            form
        );
        this.jobQueue.push(trialJobId);
        this.trialJobsMap.set(trialJobId, trialJobDetail);
231

232
        return Promise.resolve(trialJobDetail);
Deshui Yu's avatar
Deshui Yu committed
233
234
    }

235
236
237
238
239
    /**
     * Update trial job for multi-phase
     * @param trialJobId trial job id
     * @param form job application form
     */
240
    public async updateTrialJob(trialJobId: string, form: TrialJobApplicationForm): Promise<TrialJobDetail> {
chicm-ms's avatar
chicm-ms committed
241
242
243
244
        const trialJobDetail: undefined | TrialJobDetail = this.trialJobsMap.get(trialJobId);
        if (trialJobDetail === undefined) {
            throw new Error(`updateTrialJob failed: ${trialJobId} not found`);
        }
chicm-ms's avatar
chicm-ms committed
245
        await this.writeParameterFile(trialJobId, form.hyperParameters);
chicm-ms's avatar
chicm-ms committed
246
247

        return trialJobDetail;
248
    }
249

250
251
252
253
    /**
     * Is multiphase job supported in current training service
     */
    public get isMultiPhaseJobSupported(): boolean {
254
        return true;
255
256
    }

Deshui Yu's avatar
Deshui Yu committed
257
258
259
260
    /**
     * Cancel trial job
     * @param trialJobId ID of trial job
     */
QuanluZhang's avatar
QuanluZhang committed
261
    public async cancelTrialJob(trialJobId: string, isEarlyStopped: boolean = false): Promise<void> {
Deshui Yu's avatar
Deshui Yu committed
262
        const trialJob: RemoteMachineTrialJobDetail | undefined = this.trialJobsMap.get(trialJobId);
263
        if (trialJob === undefined) {
Deshui Yu's avatar
Deshui Yu committed
264
265
266
267
            throw new Error(`trial job id ${trialJobId} not found`);
        }

        // Remove the job with trialJobId from job queue
chicm-ms's avatar
chicm-ms committed
268
        const index: number = this.jobQueue.indexOf(trialJobId);
269
        if (index >= 0) {
Deshui Yu's avatar
Deshui Yu committed
270
271
272
            this.jobQueue.splice(index, 1);
        }

273
        // Get executor where the job is running
Deshui Yu's avatar
Deshui Yu committed
274
275
        if (trialJob.rmMeta !== undefined) {
            // If the trial job is already scheduled, check its status and kill the trial process in remote machine
276
            const executor = await this.getExecutor(trialJob.id);
Deshui Yu's avatar
Deshui Yu committed
277

chicm-ms's avatar
chicm-ms committed
278
279
            if (trialJob.status === 'UNKNOWN') {
                trialJob.status = 'USER_CANCELED';
280
                this.releaseTrialResource(trialJob);
chicm-ms's avatar
chicm-ms committed
281
282
283
                return
            }

284
            const jobpidPath: string = this.getJobPidPath(executor, trialJob.id);
Deshui Yu's avatar
Deshui Yu committed
285
            try {
286
287
                // Mark the toEarlyStop tag here
                trialJob.isEarlyStopped = isEarlyStopped;
288
                await executor.killChildProcesses(jobpidPath);
289
                this.releaseTrialResource(trialJob);
Deshui Yu's avatar
Deshui Yu committed
290
291
            } catch (error) {
                // Not handle the error since pkill failed will not impact trial job's current status
292
                this.log.error(`remoteTrainingService.cancelTrialJob: ${error}`);
Deshui Yu's avatar
Deshui Yu committed
293
294
295
            }
        } else {
            // Job is not scheduled yet, set status to 'USER_CANCELLED' directly
QuanluZhang's avatar
QuanluZhang committed
296
297
            assert(isEarlyStopped === false, 'isEarlyStopped is not supposed to be true here.');
            trialJob.status = getJobCancelStatus(isEarlyStopped);
Deshui Yu's avatar
Deshui Yu committed
298
299
300
301
302
303
        }
    }

    /**
     * Set culster metadata
     * @param key metadata key
304
     * //1. MACHINE_LIST -- create executor of machine list
Deshui Yu's avatar
Deshui Yu committed
305
306
307
308
309
     * //2. TRIAL_CONFIG -- trial configuration
     * @param value metadata value
     */
    public async setClusterMetadata(key: string, value: string): Promise<void> {
        switch (key) {
SparkSnail's avatar
SparkSnail committed
310
311
312
            case TrialConfigMetadataKey.NNI_MANAGER_IP:
                this.nniManagerIpConfig = <NNIManagerIpConfig>JSON.parse(value);
                break;
313
            case TrialConfigMetadataKey.MACHINE_LIST:
Deshui Yu's avatar
Deshui Yu committed
314
315
                await this.setupConnections(value);
                break;
chicm-ms's avatar
chicm-ms committed
316
            case TrialConfigMetadataKey.TRIAL_CONFIG: {
317
                const remoteMachineTrailConfig: TrialConfig = <TrialConfig>JSON.parse(value);
Deshui Yu's avatar
Deshui Yu committed
318
                // Parse trial config failed, throw Error
319
                if (remoteMachineTrailConfig === undefined) {
Deshui Yu's avatar
Deshui Yu committed
320
321
322
                    throw new Error('trial config parsed failed');
                }
                // codeDir is not a valid directory, throw Error
323
                if (!fs.lstatSync(remoteMachineTrailConfig.codeDir)
324
                    .isDirectory()) {
Deshui Yu's avatar
Deshui Yu committed
325
326
                    throw new Error(`codeDir ${remoteMachineTrailConfig.codeDir} is not a directory`);
                }
327
328

                try {
329
                    // Validate to make sure codeDir doesn't have too many files
330
                    await validateCodeDir(remoteMachineTrailConfig.codeDir);
331
332
                    // Copy codeDir to remote machine
                    for (const [rmMeta, executorManager] of this.machineExecutorManagerMap.entries()) {
333
                        const executor: ShellExecutor = await executorManager.getExecutor(this.initExecutorId);
334
335
336
                        if (executor !== undefined) {
                            this.machineCopyExpCodeDirPromiseMap.set(
                                rmMeta,
337
338
                                executor.copyDirectoryToRemote(remoteMachineTrailConfig.codeDir, executor.getRemoteCodePath(getExperimentId()))
                            );
339
340
                        }
                    }
341

342
                } catch (error) {
343
                    this.log.error(error);
344

345
                    return Promise.reject(new Error(error));
346
347
                }

Deshui Yu's avatar
Deshui Yu committed
348
349
                this.trialConfig = remoteMachineTrailConfig;
                break;
chicm-ms's avatar
chicm-ms committed
350
            }
351
352
353
            case TrialConfigMetadataKey.MULTI_PHASE:
                this.isMultiPhase = (value === 'true' || value === 'True');
                break;
354
355
356
            case TrialConfigMetadataKey.VERSION_CHECK:
                this.versionCheck = (value === 'true' || value === 'True');
                break;
SparkSnail's avatar
SparkSnail committed
357
358
359
            case TrialConfigMetadataKey.LOG_COLLECTION:
                this.logCollection = value;
                break;
Deshui Yu's avatar
Deshui Yu committed
360
361
362
363
364
365
366
367
368
369
            default:
                //Reject for unknown keys
                throw new Error(`Uknown key: ${key}`);
        }
    }

    /**
     * Get culster metadata
     * @param key metadata key
     */
370
    public async getClusterMetadata(_key: string): Promise<string> {
371
        return "";
Deshui Yu's avatar
Deshui Yu committed
372
    }
373

SparkSnail's avatar
SparkSnail committed
374
    /**
375
     * cleanup() has a time out of 10s to clean remote connections
SparkSnail's avatar
SparkSnail committed
376
377
     */
    public async cleanUp(): Promise<void> {
chicm-ms's avatar
chicm-ms committed
378
        this.log.info('Stopping remote machine training service...');
Deshui Yu's avatar
Deshui Yu committed
379
        this.stopping = true;
380
381
382
383
384
385
386
387
388
        await this.cleanupConnections();
    }

    private async getExecutor(trialId: string): Promise<ShellExecutor> {
        const executorManager = this.trialExecutorManagerMap.get(trialId);
        if (executorManager === undefined) {
            throw new Error(`ExecutorManager is not assigned for trial ${trialId}`);
        }
        return await executorManager.getExecutor(trialId);
SparkSnail's avatar
SparkSnail committed
389
    }
390

391
392
393
394
    /**
     * remove gpu reversion when job is not running
     */
    private updateGpuReservation(): void {
395
396
397
398
399
        if (this.gpuScheduler) {
            for (const [key, value] of this.trialJobsMap) {
                if (!['WAITING', 'RUNNING'].includes(value.status)) {
                    this.gpuScheduler.removeGpuReservation(key, this.trialJobsMap);
                }
400
401
402
403
            }
        }
    }

SparkSnail's avatar
SparkSnail committed
404
405
406
407
    /**
     * stop gpu_metric_collector process in remote machine and remove unused scripts
     */
    private async cleanupConnections(): Promise<void> {
408
        try {
409
410
            for (const executorManager of this.machineExecutorManagerMap.values()) {
                const executor = await executorManager.getExecutor(this.initExecutorId);
411
                if (executor !== undefined) {
412
413
                    this.log.info(`killing gpu metric collector on ${executor.name}`);
                    const gpuJobPidPath: string = executor.joinPath(executor.getRemoteScriptsPath(getExperimentId()), 'pid');
414
                    await executor.killChildProcesses(gpuJobPidPath, true);
SparkSnail's avatar
SparkSnail committed
415
                }
416
                executorManager.releaseAllExecutor();
SparkSnail's avatar
SparkSnail committed
417
            }
418
        } catch (error) {
SparkSnail's avatar
SparkSnail committed
419
            //ignore error, this function is called to cleanup remote connections when experiment is stopping
420
            this.log.error(`Cleanup connection exception, error is ${error}`);
SparkSnail's avatar
SparkSnail committed
421
        }
422
423
    }

Deshui Yu's avatar
Deshui Yu committed
424
    private async setupConnections(machineList: string): Promise<void> {
chicm-ms's avatar
chicm-ms committed
425
        this.log.debug(`Connecting to remote machines: ${machineList}`);
Deshui Yu's avatar
Deshui Yu committed
426
427
        //TO DO: verify if value's format is wrong, and json parse failed, how to handle error
        const rmMetaList: RemoteMachineMeta[] = <RemoteMachineMeta[]>JSON.parse(machineList);
SparkSnail's avatar
SparkSnail committed
428

429
        for (const rmMeta of rmMetaList) {
430
            this.sshConnectionPromises.push(this.initRemoteMachineOnConnected(rmMeta));
431
        }
Deshui Yu's avatar
Deshui Yu committed
432
433
    }

434
435
436
437
438
439
440
441
442
    private async initRemoteMachineOnConnected(rmMeta: RemoteMachineMeta): Promise<void> {
        rmMeta.occupiedGpuIndexMap = new Map<number, number>();
        const executorManager: ExecutorManager = new ExecutorManager(rmMeta);
        this.log.info(`connecting to ${rmMeta.username}@${rmMeta.ip}:${rmMeta.port}`);
        const executor: ShellExecutor = await executorManager.getExecutor(this.initExecutorId);
        this.log.debug(`reached ${executor.name}`);
        this.machineExecutorManagerMap.set(rmMeta, executorManager);
        this.log.debug(`initializing ${executor.name}`);

443
        // Create root working directory after executor is ready
444
445
        const nniRootDir: string = executor.joinPath(executor.getTempPath(), 'nni');
        await executor.createFolder(executor.getRemoteExperimentRootDir(getExperimentId()));
446

447
        // the directory to store temp scripts in remote machine
448
449
450
        const remoteGpuScriptCollectorDir: string = executor.getRemoteScriptsPath(getExperimentId());

        // clean up previous result.
451
        await executor.createFolder(remoteGpuScriptCollectorDir, true);
Junwei Sun's avatar
Junwei Sun committed
452
        await executor.allowPermission(true, nniRootDir);
453

Deshui Yu's avatar
Deshui Yu committed
454
        //Begin to execute gpu_metrics_collection scripts
455
        const script = executor.generateGpuStatsScript(getExperimentId());
456
        executor.executeScript(script, false, true);
457
458
459
        // the timer is trigger in 1 second, it causes multiple runs on server.
        // So reduce it's freqeunce, only allow one of it run.
        const collectingCount: boolean[] = [];
460

461
        const disposable: Rx.IDisposable = this.timer.subscribe(
462
            async () => {
463
464
465
466
467
468
469
470
471
                if (collectingCount.length == 0) {
                    collectingCount.push(true);
                    const cmdresult = await executor.readLastLines(executor.joinPath(remoteGpuScriptCollectorDir, 'gpu_metrics'));
                    if (cmdresult !== "") {
                        rmMeta.gpuSummary = <GPUSummary>JSON.parse(cmdresult);
                        if (rmMeta.gpuSummary.gpuCount === 0) {
                            this.log.warning(`No GPU found on remote machine ${rmMeta.ip}`);
                            this.timer.unsubscribe(disposable);
                        }
472
                    }
473
                    if (this.stopping) {
474
475
476
                        this.timer.unsubscribe(disposable);
                        this.log.debug(`Stopped GPU collector on ${rmMeta.ip}, since experiment is exiting.`);
                    }
477
                    collectingCount.pop();
Deshui Yu's avatar
Deshui Yu committed
478
479
480
481
482
483
                }
            }
        );
    }

    private async prepareTrialJob(trialJobId: string): Promise<boolean> {
chicm-ms's avatar
chicm-ms committed
484
        const deferred: Deferred<boolean> = new Deferred<boolean>();
Deshui Yu's avatar
Deshui Yu committed
485

486
        if (this.trialConfig === undefined) {
Deshui Yu's avatar
Deshui Yu committed
487
488
            throw new Error('trial config is not initialized');
        }
489
490
491
        if (this.gpuScheduler === undefined) {
            throw new Error('gpuScheduler is not initialized');
        }
Deshui Yu's avatar
Deshui Yu committed
492
493
494
495
        const trialJobDetail: RemoteMachineTrialJobDetail | undefined = this.trialJobsMap.get(trialJobId);
        if (trialJobDetail === undefined) {
            throw new NNIError(NNIErrorNames.INVALID_JOB_DETAIL, `Invalid job detail information for trial job ${trialJobId}`);
        }
496
497
498
        // If job is not WATIING, Don't prepare and resolve true immediately
        if (trialJobDetail.status !== 'WAITING') {
            deferred.resolve(true);
499

500
501
            return deferred.promise;
        }
502
        // get an executor from scheduler
503
        const rmScheduleResult: RemoteMachineScheduleResult = this.gpuScheduler.scheduleMachine(this.trialConfig.gpuNum, trialJobDetail);
Deshui Yu's avatar
Deshui Yu committed
504
        if (rmScheduleResult.resultType === ScheduleResultType.REQUIRE_EXCEED_TOTAL) {
chicm-ms's avatar
chicm-ms committed
505
            const errorMessage: string = `Required GPU number ${this.trialConfig.gpuNum} is too large, no machine can meet`;
Deshui Yu's avatar
Deshui Yu committed
506
507
508
            this.log.error(errorMessage);
            deferred.reject();
            throw new NNIError(NNIErrorNames.RESOURCE_NOT_AVAILABLE, errorMessage);
509
        } else if (rmScheduleResult.resultType === ScheduleResultType.SUCCEED
Deshui Yu's avatar
Deshui Yu committed
510
            && rmScheduleResult.scheduleInfo !== undefined) {
chicm-ms's avatar
chicm-ms committed
511
            const rmScheduleInfo: RemoteMachineScheduleInfo = rmScheduleResult.scheduleInfo;
SparkSnail's avatar
SparkSnail committed
512
513

            trialJobDetail.rmMeta = rmScheduleInfo.rmMeta;
514
515
516
517
            const copyExpCodeDirPromise = this.machineCopyExpCodeDirPromiseMap.get(trialJobDetail.rmMeta);
            if (copyExpCodeDirPromise !== undefined) {
                await copyExpCodeDirPromise;
            }
SparkSnail's avatar
SparkSnail committed
518

519
520
521
522
523
            this.allocateExecutorManagerForTrial(trialJobDetail);
            const executor = await this.getExecutor(trialJobDetail.id);

            trialJobDetail.workingDirectory = executor.joinPath(executor.getRemoteExperimentRootDir(getExperimentId()), 'trials', trialJobDetail.id);

Deshui Yu's avatar
Deshui Yu committed
524
            await this.launchTrialOnScheduledMachine(
525
                trialJobId, trialJobDetail.form, rmScheduleInfo);
Deshui Yu's avatar
Deshui Yu committed
526
527

            trialJobDetail.status = 'RUNNING';
528
            trialJobDetail.url = `file://${rmScheduleInfo.rmMeta.ip}:${trialJobDetail.workingDirectory}`;
529
            trialJobDetail.startTime = Date.now();
Deshui Yu's avatar
Deshui Yu committed
530

531
            this.trialJobsMap.set(trialJobId, trialJobDetail);
Deshui Yu's avatar
Deshui Yu committed
532
            deferred.resolve(true);
533
        } else if (rmScheduleResult.resultType === ScheduleResultType.TMP_NO_AVAILABLE_GPU) {
Deshui Yu's avatar
Deshui Yu committed
534
535
536
            this.log.info(`Right now no available GPU can be allocated for trial ${trialJobId}, will try to schedule later`);
            deferred.resolve(false);
        } else {
537
            deferred.reject(`Invalid schedule resutl type: ${rmScheduleResult.resultType}`);
Deshui Yu's avatar
Deshui Yu committed
538
539
540
541
542
        }

        return deferred.promise;
    }

543
    private async launchTrialOnScheduledMachine(trialJobId: string, form: TrialJobApplicationForm,
544
        rmScheduleInfo: RemoteMachineScheduleInfo): Promise<void> {
545
        if (this.trialConfig === undefined) {
Deshui Yu's avatar
Deshui Yu committed
546
547
            throw new Error('trial config is not initialized');
        }
chicm-ms's avatar
chicm-ms committed
548
        const cudaVisibleDevice: string = rmScheduleInfo.cudaVisibleDevice;
549
        const executor = await this.getExecutor(trialJobId);
550
551
552
553
554
        const trialJobDetail: RemoteMachineTrialJobDetail | undefined = this.trialJobsMap.get(trialJobId);
        if (trialJobDetail === undefined) {
            throw new Error(`Can not get trial job detail for job: ${trialJobId}`);
        }

Deshui Yu's avatar
Deshui Yu committed
555
556
        const trialLocalTempFolder: string = path.join(this.expRootDir, 'trials-local', trialJobId);

557
        await executor.createFolder(executor.joinPath(trialJobDetail.workingDirectory, '.nni'));
Deshui Yu's avatar
Deshui Yu committed
558
559
560

        // RemoteMachineRunShellFormat is the run shell format string,
        // See definition in remoteMachineData.ts
SparkSnail's avatar
SparkSnail committed
561

562
        let cudaVisible: string;
chicm-ms's avatar
chicm-ms committed
563
564
        // Set CUDA_VISIBLE_DEVICES environment variable based on cudaVisibleDevice
        // If no valid cudaVisibleDevice is defined, set CUDA_VISIBLE_DEVICES to empty string to hide GPU device
SparkSnail's avatar
SparkSnail committed
565
566
        // If gpuNum is undefined, will not set CUDA_VISIBLE_DEVICES in script
        if (this.trialConfig.gpuNum === undefined) {
567
            cudaVisible = ""
SparkSnail's avatar
SparkSnail committed
568
        } else {
chicm-ms's avatar
chicm-ms committed
569
            if (typeof cudaVisibleDevice === 'string' && cudaVisibleDevice.length > 0) {
570
                cudaVisible = `CUDA_VISIBLE_DEVICES=${cudaVisibleDevice}`;
SparkSnail's avatar
SparkSnail committed
571
            } else {
572
                cudaVisible = `CUDA_VISIBLE_DEVICES=" "`;
SparkSnail's avatar
SparkSnail committed
573
            }
SparkSnail's avatar
SparkSnail committed
574
        }
575
576
        const nniManagerIp: string = this.nniManagerIpConfig ? this.nniManagerIpConfig.nniManagerIp : getIPV4Address();
        if (this.remoteRestServerPort === undefined) {
SparkSnail's avatar
SparkSnail committed
577
578
579
            const restServer: RemoteMachineJobRestServer = component.get(RemoteMachineJobRestServer);
            this.remoteRestServerPort = restServer.clusterRestServerPort;
        }
580
        const version: string = this.versionCheck ? await getVersion() : '';
581
582
        const runScriptTrialContent: string = executor.generateStartScript(
            trialJobDetail.workingDirectory,
Deshui Yu's avatar
Deshui Yu committed
583
            trialJobId,
SparkSnail's avatar
SparkSnail committed
584
            getExperimentId(),
585
            trialJobDetail.form.sequenceId.toString(),
586
            this.isMultiPhase,
587
            this.trialConfig.command,
SparkSnail's avatar
SparkSnail committed
588
589
            nniManagerIp,
            this.remoteRestServerPort,
590
            version,
591
            this.logCollection, cudaVisible);
Deshui Yu's avatar
Deshui Yu committed
592
593

        //create tmp trial working folder locally.
594
        await execMkdir(path.join(trialLocalTempFolder, '.nni'));
595
596
597

        // Write install_nni.sh, it's not used in Windows platform.
        await fs.promises.writeFile(path.join(trialLocalTempFolder, executor.getScriptName("install_nni")), CONTAINER_INSTALL_NNI_SHELL_FORMAT, { encoding: 'utf8' });
598
        // Write file content ( run.sh and parameter.cfg ) to local tmp files
599
        await fs.promises.writeFile(path.join(trialLocalTempFolder, executor.getScriptName("run")), runScriptTrialContent, { encoding: 'utf8' });
chicm-ms's avatar
chicm-ms committed
600
        await this.writeParameterFile(trialJobId, form.hyperParameters);
Deshui Yu's avatar
Deshui Yu committed
601
        // Copy files in codeDir to remote working directory
602
        await executor.copyDirectoryToRemote(trialLocalTempFolder, trialJobDetail.workingDirectory);
Deshui Yu's avatar
Deshui Yu committed
603
        // Execute command in remote machine
604
        executor.executeScript(executor.joinPath(trialJobDetail.workingDirectory, executor.getScriptName("run")), true, true);
Deshui Yu's avatar
Deshui Yu committed
605
606
    }

607
    private async updateTrialJobStatus(trialJob: RemoteMachineTrialJobDetail, executor: ShellExecutor): Promise<TrialJobDetail> {
Deshui Yu's avatar
Deshui Yu committed
608
        const deferred: Deferred<TrialJobDetail> = new Deferred<TrialJobDetail>();
609
610
        const jobpidPath: string = this.getJobPidPath(executor, trialJob.id);
        const trialReturnCodeFilePath: string = executor.joinPath(executor.getRemoteExperimentRootDir(getExperimentId()), 'trials', trialJob.id, '.nni', 'code');
611
        /* eslint-disable require-atomic-updates */
Deshui Yu's avatar
Deshui Yu committed
612
        try {
613
            const isAlive = await executor.isProcessAlive(jobpidPath);
Deshui Yu's avatar
Deshui Yu committed
614
            // if the process of jobpid is not alive any more
615
616
617
618
            if (!isAlive) {
                const trialReturnCode: string = await executor.getRemoteFileContent(trialReturnCodeFilePath);
                this.log.debug(`trailjob ${trialJob.id} return code: ${trialReturnCode}`);
                const match: RegExpMatchArray | null = trialReturnCode.trim()
619
                    .match(/^-?(\d+)\s+(\d+)$/);
620
                if (match !== null) {
Deshui Yu's avatar
Deshui Yu committed
621
622
623
624
625
                    const { 1: code, 2: timestamp } = match;
                    // Update trial job's status based on result code
                    if (parseInt(code, 10) === 0) {
                        trialJob.status = 'SUCCEEDED';
                    } else {
626
627
628
629
630
631
                        // isEarlyStopped is never set, mean it's not cancelled by NNI, so if the process's exit code >0, mark it as FAILED
                        if (trialJob.isEarlyStopped === undefined) {
                            trialJob.status = 'FAILED';
                        } else {
                            trialJob.status = getJobCancelStatus(trialJob.isEarlyStopped);
                        }
Deshui Yu's avatar
Deshui Yu committed
632
                    }
633
                    trialJob.endTime = parseInt(timestamp, 10);
634
                    this.releaseTrialResource(trialJob);
Deshui Yu's avatar
Deshui Yu committed
635
                }
chicm-ms's avatar
chicm-ms committed
636
                this.log.debug(`trailJob status update: ${trialJob.id}, ${trialJob.status}`);
Deshui Yu's avatar
Deshui Yu committed
637
638
639
            }
            deferred.resolve(trialJob);
        } catch (error) {
640
            this.log.debug(`(Ignorable mostly)Update job status exception, error is ${error.message}`);
Deshui Yu's avatar
Deshui Yu committed
641
642
643
644
645
646
647
            if (error instanceof NNIError && error.name === NNIErrorNames.NOT_FOUND) {
                deferred.resolve(trialJob);
            } else {
                trialJob.status = 'UNKNOWN';
                deferred.resolve(trialJob);
            }
        }
648
        /* eslint-enable require-atomic-updates */
Deshui Yu's avatar
Deshui Yu committed
649
650
651
        return deferred.promise;
    }

chicm-ms's avatar
chicm-ms committed
652
    public get MetricsEmitter(): EventEmitter {
SparkSnail's avatar
SparkSnail committed
653
654
655
        return this.metricsEmitter;
    }

656
    private getJobPidPath(executor: ShellExecutor, jobId: string): string {
Deshui Yu's avatar
Deshui Yu committed
657
658
659
660
661
        const trialJobDetail: RemoteMachineTrialJobDetail | undefined = this.trialJobsMap.get(jobId);
        if (trialJobDetail === undefined) {
            throw new NNIError(NNIErrorNames.INVALID_JOB_DETAIL, `Invalid job detail information for trial job ${jobId}`);
        }

662
        return executor.joinPath(trialJobDetail.workingDirectory, '.nni', 'jobpid');
Deshui Yu's avatar
Deshui Yu committed
663
    }
chicm-ms's avatar
chicm-ms committed
664

chicm-ms's avatar
chicm-ms committed
665
    private async writeParameterFile(trialJobId: string, hyperParameters: HyperParameters): Promise<void> {
666
        const executor = await this.getExecutor(trialJobId);
chicm-ms's avatar
chicm-ms committed
667

668
        const trialWorkingFolder: string = executor.joinPath(executor.getRemoteExperimentRootDir(getExperimentId()), 'trials', trialJobId);
chicm-ms's avatar
chicm-ms committed
669
670
        const trialLocalTempFolder: string = path.join(this.expRootDir, 'trials-local', trialJobId);

671
        const fileName: string = generateParamFileName(hyperParameters);
chicm-ms's avatar
chicm-ms committed
672
673
674
        const localFilepath: string = path.join(trialLocalTempFolder, fileName);
        await fs.promises.writeFile(localFilepath, hyperParameters.value, { encoding: 'utf8' });

675
        await executor.copyFileToRemote(localFilepath, executor.joinPath(trialWorkingFolder, fileName));
chicm-ms's avatar
chicm-ms committed
676
    }
Deshui Yu's avatar
Deshui Yu committed
677
678
679
}

export { RemoteMachineTrainingService };