remoteMachineTrainingService.ts 29.6 KB
Newer Older
liuzhe-lz's avatar
liuzhe-lz committed
1
2
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.
Deshui Yu's avatar
Deshui Yu committed
3
4
5

'use strict';

6
import * as assert from 'assert';
Deshui Yu's avatar
Deshui Yu committed
7
8
9
import { EventEmitter } from 'events';
import * as fs from 'fs';
import * as path from 'path';
10
import { ShellExecutor } from 'training_service/remote_machine/shellExecutor';
Deshui Yu's avatar
Deshui Yu committed
11
12
import { Deferred } from 'ts-deferred';
import * as component from '../../common/component';
SparkSnail's avatar
SparkSnail committed
13
import { NNIError, NNIErrorNames } from '../../common/errors';
14
import { getExperimentId } from '../../common/experimentStartupInfo';
Deshui Yu's avatar
Deshui Yu committed
15
16
17
import { getLogger, Logger } from '../../common/log';
import { ObservableTimer } from '../../common/observableTimer';
import {
18
    HyperParameters, NNIManagerIpConfig, TrainingService, TrialJobApplicationForm,
19
    TrialJobDetail, TrialJobMetric
Deshui Yu's avatar
Deshui Yu committed
20
} from '../../common/trainingService';
21
import {
22
23
    delay, generateParamFileName, getExperimentRootDir, getIPV4Address, getJobCancelStatus,
    getVersion, uniqueString
24
25
} from '../../common/utils';
import { CONTAINER_INSTALL_NNI_SHELL_FORMAT } from '../common/containerJobData';
26
import { GPUSummary, ScheduleResultType } from '../common/gpuData';
27
28
import { TrialConfig } from '../common/trialConfig';
import { TrialConfigMetadataKey } from '../common/trialConfigMetadataKey';
29
import { execMkdir, validateCodeDir } from '../common/util';
Deshui Yu's avatar
Deshui Yu committed
30
31
import { GPUScheduler } from './gpuScheduler';
import {
32
33
    ExecutorManager, RemoteMachineMeta,
    RemoteMachineScheduleInfo, RemoteMachineScheduleResult, RemoteMachineTrialJobDetail
Deshui Yu's avatar
Deshui Yu committed
34
} from './remoteMachineData';
SparkSnail's avatar
SparkSnail committed
35
import { RemoteMachineJobRestServer } from './remoteMachineJobRestServer';
Deshui Yu's avatar
Deshui Yu committed
36
37
38
39

/**
 * Training Service implementation for Remote Machine (Linux)
 */
SparkSnail's avatar
SparkSnail committed
40
@component.Singleton
Deshui Yu's avatar
Deshui Yu committed
41
class RemoteMachineTrainingService implements TrainingService {
42
    private readonly initExecutorId = "initConnection";
43
    private readonly machineExecutorManagerMap: Map<RemoteMachineMeta, ExecutorManager>; //machine excutor map
44
    private readonly machineCopyExpCodeDirPromiseMap: Map<RemoteMachineMeta, Promise<void>>;
45
    private readonly trialExecutorManagerMap: Map<string, ExecutorManager>; //trial excutor map
46
47
    private readonly trialJobsMap: Map<string, RemoteMachineTrialJobDetail>;
    private readonly expRootDir: string;
48
    private trialConfig: TrialConfig | undefined;
49
    private gpuScheduler?: GPUScheduler;
50
51
    private readonly jobQueue: string[];
    private readonly timer: ObservableTimer;
Deshui Yu's avatar
Deshui Yu committed
52
    private stopping: boolean = false;
53
54
    private readonly metricsEmitter: EventEmitter;
    private readonly log: Logger;
55
    private isMultiPhase: boolean = false;
SparkSnail's avatar
SparkSnail committed
56
57
    private remoteRestServerPort?: number;
    private nniManagerIpConfig?: NNIManagerIpConfig;
58
    private versionCheck: boolean = true;
SparkSnail's avatar
SparkSnail committed
59
    private logCollection: string;
Deshui Yu's avatar
Deshui Yu committed
60
61
62
63

    constructor(@component.Inject timer: ObservableTimer) {
        this.metricsEmitter = new EventEmitter();
        this.trialJobsMap = new Map<string, RemoteMachineTrialJobDetail>();
64
        this.trialExecutorManagerMap = new Map<string, ExecutorManager>();
65
        this.machineCopyExpCodeDirPromiseMap = new Map<RemoteMachineMeta, Promise<void>>();
66
        this.machineExecutorManagerMap = new Map<RemoteMachineMeta, ExecutorManager>();
Deshui Yu's avatar
Deshui Yu committed
67
68
69
70
        this.jobQueue = [];
        this.expRootDir = getExperimentRootDir();
        this.timer = timer;
        this.log = getLogger();
SparkSnail's avatar
SparkSnail committed
71
        this.logCollection = 'none';
chicm-ms's avatar
chicm-ms committed
72
        this.log.info('Construct remote machine training service.');
Deshui Yu's avatar
Deshui Yu committed
73
74
75
76
77
78
    }

    /**
     * Loop to launch trial jobs and collect trial metrics
     */
    public async run(): Promise<void> {
SparkSnail's avatar
SparkSnail committed
79
80
        const restServer: RemoteMachineJobRestServer = component.get(RemoteMachineJobRestServer);
        await restServer.start();
81
        restServer.setEnableVersionCheck = this.versionCheck;
chicm-ms's avatar
chicm-ms committed
82
        this.log.info('Run remote machine training service.');
Deshui Yu's avatar
Deshui Yu committed
83
84
        while (!this.stopping) {
            while (this.jobQueue.length > 0) {
SparkSnail's avatar
SparkSnail committed
85
                this.updateGpuReservation();
Deshui Yu's avatar
Deshui Yu committed
86
                const trialJobId: string = this.jobQueue[0];
chicm-ms's avatar
chicm-ms committed
87
                const prepareResult: boolean = await this.prepareTrialJob(trialJobId);
Deshui Yu's avatar
Deshui Yu committed
88
89
90
91
                if (prepareResult) {
                    // Remove trial job with trialJobId from job queue
                    this.jobQueue.shift();
                } else {
92
                    // Break the while loop since no GPU resource is available right now,
Deshui Yu's avatar
Deshui Yu committed
93
94
95
                    // Wait to schedule job in next time iteration
                    break;
                }
96
            }
97
            if (restServer.getErrorMessage !== undefined) {
98
                this.stopping = true;
99
                throw new Error(restServer.getErrorMessage);
100
            }
Deshui Yu's avatar
Deshui Yu committed
101
102
            await delay(3000);
        }
103
        this.log.info('RemoteMachineTrainingService run loop exited.');
Deshui Yu's avatar
Deshui Yu committed
104
    }
105

SparkSnail's avatar
SparkSnail committed
106
    /**
107
     * give trial an executor
108
     * @param trial remote machine trial job detail
SparkSnail's avatar
SparkSnail committed
109
     */
110
    public allocateExecutorManagerForTrial(trial: RemoteMachineTrialJobDetail): void {
111
        if (trial.rmMeta === undefined) {
SparkSnail's avatar
SparkSnail committed
112
113
            throw new Error(`rmMeta not set in trial ${trial.id}`);
        }
114
115
116
        const executorManager: ExecutorManager | undefined = this.machineExecutorManagerMap.get(trial.rmMeta);
        if (executorManager === undefined) {
            throw new Error(`executorManager not initialized`);
SparkSnail's avatar
SparkSnail committed
117
        }
118
        this.trialExecutorManagerMap.set(trial.id, executorManager);
SparkSnail's avatar
SparkSnail committed
119
    }
120

SparkSnail's avatar
SparkSnail committed
121
122
    /**
     * If a trial is finished, release the connection resource
123
     * @param trial remote machine trial job detail
SparkSnail's avatar
SparkSnail committed
124
     */
125
    public releaseTrialResource(trial: RemoteMachineTrialJobDetail): void {
126
        if (trial.rmMeta === undefined) {
SparkSnail's avatar
SparkSnail committed
127
128
            throw new Error(`rmMeta not set in trial ${trial.id}`);
        }
129
        const executorManager = this.trialExecutorManagerMap.get(trial.id);
130
        if (executorManager === undefined) {
131
            throw new Error(`ExecutorManager is not assigned for trial ${trial.id}`);
SparkSnail's avatar
SparkSnail committed
132
        }
133
134
        // Note, it still keep reference in trialExecutorManagerMap, as there may be following requests from nni manager.
        executorManager.releaseExecutor(trial.id);
SparkSnail's avatar
SparkSnail committed
135
    }
Deshui Yu's avatar
Deshui Yu committed
136
137
138
139

    /**
     * List submitted trial jobs
     */
140
    public async listTrialJobs(): Promise<TrialJobDetail[]> {
Deshui Yu's avatar
Deshui Yu committed
141
142
143
        const jobs: TrialJobDetail[] = [];
        const deferred: Deferred<TrialJobDetail[]> = new Deferred<TrialJobDetail[]>();

144
        for (const [key,] of this.trialJobsMap) {
145
            jobs.push(await this.getTrialJob(key));
146
        }
Deshui Yu's avatar
Deshui Yu committed
147
148
149
150
151
152
153
154
155
156
157
        deferred.resolve(jobs);

        return deferred.promise;
    }

    /**
     * Get trial job detail information
     * @param trialJobId ID of trial job
     */
    public async getTrialJob(trialJobId: string): Promise<TrialJobDetail> {
        const trialJob: RemoteMachineTrialJobDetail | undefined = this.trialJobsMap.get(trialJobId);
158
        if (trialJob === undefined) {
Deshui Yu's avatar
Deshui Yu committed
159
160
161
162
            throw new NNIError(NNIErrorNames.NOT_FOUND, `trial job id ${trialJobId} not found`);
        }
        //TO DO: add another job status, and design new job status change logic
        if (trialJob.status === 'RUNNING' || trialJob.status === 'UNKNOWN') {
163
            // Get executor where the job is running
Deshui Yu's avatar
Deshui Yu committed
164
165
166
            if (trialJob.rmMeta === undefined) {
                throw new Error(`rmMeta not set for submitted job ${trialJobId}`);
            }
167
            const executor = await this.getExecutor(trialJob.id);
Deshui Yu's avatar
Deshui Yu committed
168

169
            return this.updateTrialJobStatus(trialJob, executor);
Deshui Yu's avatar
Deshui Yu committed
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
        } else {
            return trialJob;
        }
    }

    /**
     * Add job metrics listener
     * @param listener callback listener
     */
    public addTrialJobMetricListener(listener: (metric: TrialJobMetric) => void): void {
        this.metricsEmitter.on('metric', listener);
    }

    /**
     * Remove job metrics listener
     * @param listener callback listener
     */
    public removeTrialJobMetricListener(listener: (metric: TrialJobMetric) => void): void {
        this.metricsEmitter.off('metric', listener);
    }

    /**
     * Submit trial job
     * @param form trial job description form
     */
195
    public async submitTrialJob(form: TrialJobApplicationForm): Promise<TrialJobDetail> {
196
        if (this.trialConfig === undefined) {
Deshui Yu's avatar
Deshui Yu committed
197
198
199
            throw new Error('trial config is not initialized');
        }

200
201
        // Generate trial job id(random)
        const trialJobId: string = uniqueString(5);
Deshui Yu's avatar
Deshui Yu committed
202

203
204
205
206
        const trialJobDetail: RemoteMachineTrialJobDetail = new RemoteMachineTrialJobDetail(
            trialJobId,
            'WAITING',
            Date.now(),
207
            "unset",
208
209
210
211
            form
        );
        this.jobQueue.push(trialJobId);
        this.trialJobsMap.set(trialJobId, trialJobDetail);
212

213
        return Promise.resolve(trialJobDetail);
Deshui Yu's avatar
Deshui Yu committed
214
215
    }

216
217
218
219
220
    /**
     * Update trial job for multi-phase
     * @param trialJobId trial job id
     * @param form job application form
     */
221
    public async updateTrialJob(trialJobId: string, form: TrialJobApplicationForm): Promise<TrialJobDetail> {
chicm-ms's avatar
chicm-ms committed
222
223
224
225
        const trialJobDetail: undefined | TrialJobDetail = this.trialJobsMap.get(trialJobId);
        if (trialJobDetail === undefined) {
            throw new Error(`updateTrialJob failed: ${trialJobId} not found`);
        }
chicm-ms's avatar
chicm-ms committed
226
        await this.writeParameterFile(trialJobId, form.hyperParameters);
chicm-ms's avatar
chicm-ms committed
227
228

        return trialJobDetail;
229
    }
230

231
232
233
234
    /**
     * Is multiphase job supported in current training service
     */
    public get isMultiPhaseJobSupported(): boolean {
235
        return true;
236
237
    }

Deshui Yu's avatar
Deshui Yu committed
238
239
240
241
    /**
     * Cancel trial job
     * @param trialJobId ID of trial job
     */
QuanluZhang's avatar
QuanluZhang committed
242
    public async cancelTrialJob(trialJobId: string, isEarlyStopped: boolean = false): Promise<void> {
Deshui Yu's avatar
Deshui Yu committed
243
        const trialJob: RemoteMachineTrialJobDetail | undefined = this.trialJobsMap.get(trialJobId);
244
        if (trialJob === undefined) {
Deshui Yu's avatar
Deshui Yu committed
245
246
247
248
            throw new Error(`trial job id ${trialJobId} not found`);
        }

        // Remove the job with trialJobId from job queue
chicm-ms's avatar
chicm-ms committed
249
        const index: number = this.jobQueue.indexOf(trialJobId);
250
        if (index >= 0) {
Deshui Yu's avatar
Deshui Yu committed
251
252
253
            this.jobQueue.splice(index, 1);
        }

254
        // Get executor where the job is running
Deshui Yu's avatar
Deshui Yu committed
255
256
        if (trialJob.rmMeta !== undefined) {
            // If the trial job is already scheduled, check its status and kill the trial process in remote machine
257
            const executor = await this.getExecutor(trialJob.id);
Deshui Yu's avatar
Deshui Yu committed
258

chicm-ms's avatar
chicm-ms committed
259
260
            if (trialJob.status === 'UNKNOWN') {
                trialJob.status = 'USER_CANCELED';
261
                this.releaseTrialResource(trialJob);
chicm-ms's avatar
chicm-ms committed
262
263
264
                return
            }

265
            const jobpidPath: string = this.getJobPidPath(executor, trialJob.id);
Deshui Yu's avatar
Deshui Yu committed
266
            try {
267
268
                // Mark the toEarlyStop tag here
                trialJob.isEarlyStopped = isEarlyStopped;
269
                await executor.killChildProcesses(jobpidPath);
270
                this.releaseTrialResource(trialJob);
Deshui Yu's avatar
Deshui Yu committed
271
272
            } catch (error) {
                // Not handle the error since pkill failed will not impact trial job's current status
273
                this.log.error(`remoteTrainingService.cancelTrialJob: ${error}`);
Deshui Yu's avatar
Deshui Yu committed
274
275
276
            }
        } else {
            // Job is not scheduled yet, set status to 'USER_CANCELLED' directly
QuanluZhang's avatar
QuanluZhang committed
277
278
            assert(isEarlyStopped === false, 'isEarlyStopped is not supposed to be true here.');
            trialJob.status = getJobCancelStatus(isEarlyStopped);
Deshui Yu's avatar
Deshui Yu committed
279
280
281
282
283
284
        }
    }

    /**
     * Set culster metadata
     * @param key metadata key
285
     * //1. MACHINE_LIST -- create executor of machine list
Deshui Yu's avatar
Deshui Yu committed
286
287
288
289
290
     * //2. TRIAL_CONFIG -- trial configuration
     * @param value metadata value
     */
    public async setClusterMetadata(key: string, value: string): Promise<void> {
        switch (key) {
SparkSnail's avatar
SparkSnail committed
291
292
293
            case TrialConfigMetadataKey.NNI_MANAGER_IP:
                this.nniManagerIpConfig = <NNIManagerIpConfig>JSON.parse(value);
                break;
294
            case TrialConfigMetadataKey.MACHINE_LIST:
Deshui Yu's avatar
Deshui Yu committed
295
                await this.setupConnections(value);
296
                this.gpuScheduler = new GPUScheduler(this.machineExecutorManagerMap);
Deshui Yu's avatar
Deshui Yu committed
297
                break;
chicm-ms's avatar
chicm-ms committed
298
            case TrialConfigMetadataKey.TRIAL_CONFIG: {
299
                const remoteMachineTrailConfig: TrialConfig = <TrialConfig>JSON.parse(value);
Deshui Yu's avatar
Deshui Yu committed
300
                // Parse trial config failed, throw Error
301
                if (remoteMachineTrailConfig === undefined) {
Deshui Yu's avatar
Deshui Yu committed
302
303
304
                    throw new Error('trial config parsed failed');
                }
                // codeDir is not a valid directory, throw Error
305
                if (!fs.lstatSync(remoteMachineTrailConfig.codeDir)
306
                    .isDirectory()) {
Deshui Yu's avatar
Deshui Yu committed
307
308
                    throw new Error(`codeDir ${remoteMachineTrailConfig.codeDir} is not a directory`);
                }
309
310

                try {
311
                    // Validate to make sure codeDir doesn't have too many files
312
                    await validateCodeDir(remoteMachineTrailConfig.codeDir);
313
314
                    // Copy codeDir to remote machine
                    for (const [rmMeta, executorManager] of this.machineExecutorManagerMap.entries()) {
315
                        const executor: ShellExecutor = await executorManager.getExecutor(this.initExecutorId);
316
317
318
                        if (executor !== undefined) {
                            this.machineCopyExpCodeDirPromiseMap.set(
                                rmMeta,
319
320
                                executor.copyDirectoryToRemote(remoteMachineTrailConfig.codeDir, executor.getRemoteCodePath(getExperimentId()))
                            );
321
322
                        }
                    }
323

324
                } catch (error) {
325
                    this.log.error(error);
326

327
                    return Promise.reject(new Error(error));
328
329
                }

Deshui Yu's avatar
Deshui Yu committed
330
331
                this.trialConfig = remoteMachineTrailConfig;
                break;
chicm-ms's avatar
chicm-ms committed
332
            }
333
334
335
            case TrialConfigMetadataKey.MULTI_PHASE:
                this.isMultiPhase = (value === 'true' || value === 'True');
                break;
336
337
338
            case TrialConfigMetadataKey.VERSION_CHECK:
                this.versionCheck = (value === 'true' || value === 'True');
                break;
SparkSnail's avatar
SparkSnail committed
339
340
341
            case TrialConfigMetadataKey.LOG_COLLECTION:
                this.logCollection = value;
                break;
Deshui Yu's avatar
Deshui Yu committed
342
343
344
345
346
347
348
349
350
351
            default:
                //Reject for unknown keys
                throw new Error(`Uknown key: ${key}`);
        }
    }

    /**
     * Get culster metadata
     * @param key metadata key
     */
352
    public async getClusterMetadata(_key: string): Promise<string> {
353
        return "";
Deshui Yu's avatar
Deshui Yu committed
354
    }
355

SparkSnail's avatar
SparkSnail committed
356
    /**
357
     * cleanup() has a time out of 10s to clean remote connections
SparkSnail's avatar
SparkSnail committed
358
359
     */
    public async cleanUp(): Promise<void> {
chicm-ms's avatar
chicm-ms committed
360
        this.log.info('Stopping remote machine training service...');
Deshui Yu's avatar
Deshui Yu committed
361
        this.stopping = true;
362
363
364
365
366
367
368
369
370
        await this.cleanupConnections();
    }

    private async getExecutor(trialId: string): Promise<ShellExecutor> {
        const executorManager = this.trialExecutorManagerMap.get(trialId);
        if (executorManager === undefined) {
            throw new Error(`ExecutorManager is not assigned for trial ${trialId}`);
        }
        return await executorManager.getExecutor(trialId);
SparkSnail's avatar
SparkSnail committed
371
    }
372

373
374
375
376
    /**
     * remove gpu reversion when job is not running
     */
    private updateGpuReservation(): void {
377
378
379
380
381
        if (this.gpuScheduler) {
            for (const [key, value] of this.trialJobsMap) {
                if (!['WAITING', 'RUNNING'].includes(value.status)) {
                    this.gpuScheduler.removeGpuReservation(key, this.trialJobsMap);
                }
382
383
384
385
            }
        }
    }

SparkSnail's avatar
SparkSnail committed
386
387
388
389
    /**
     * stop gpu_metric_collector process in remote machine and remove unused scripts
     */
    private async cleanupConnections(): Promise<void> {
390
        try {
391
392
            for (const executorManager of this.machineExecutorManagerMap.values()) {
                const executor = await executorManager.getExecutor(this.initExecutorId);
393
                if (executor !== undefined) {
394
395
                    this.log.info(`killing gpu metric collector on ${executor.name}`);
                    const gpuJobPidPath: string = executor.joinPath(executor.getRemoteScriptsPath(getExperimentId()), 'pid');
396
                    await executor.killChildProcesses(gpuJobPidPath, true);
SparkSnail's avatar
SparkSnail committed
397
                }
398
                executorManager.releaseAllExecutor();
SparkSnail's avatar
SparkSnail committed
399
            }
400
        } catch (error) {
SparkSnail's avatar
SparkSnail committed
401
            //ignore error, this function is called to cleanup remote connections when experiment is stopping
402
            this.log.error(`Cleanup connection exception, error is ${error}`);
SparkSnail's avatar
SparkSnail committed
403
        }
404
405
    }

Deshui Yu's avatar
Deshui Yu committed
406
    private async setupConnections(machineList: string): Promise<void> {
chicm-ms's avatar
chicm-ms committed
407
        this.log.debug(`Connecting to remote machines: ${machineList}`);
Deshui Yu's avatar
Deshui Yu committed
408
409
        //TO DO: verify if value's format is wrong, and json parse failed, how to handle error
        const rmMetaList: RemoteMachineMeta[] = <RemoteMachineMeta[]>JSON.parse(machineList);
SparkSnail's avatar
SparkSnail committed
410

411
412
        const connectionPromises = [];
        for (const rmMeta of rmMetaList) {
413
            rmMeta.occupiedGpuIndexMap = new Map<number, number>();
414
415
416
417
            const executorManager: ExecutorManager = new ExecutorManager(rmMeta);
            this.log.info(`connecting to ${rmMeta.username}@${rmMeta.ip}:${rmMeta.port}`);
            const executor: ShellExecutor = await executorManager.getExecutor(this.initExecutorId);
            this.log.debug(`reached ${executor.name}`);
418
            this.machineExecutorManagerMap.set(rmMeta, executorManager);
419
            this.log.debug(`initializing ${executor.name}`);
420
            connectionPromises.push(this.initRemoteMachineOnConnected(rmMeta, executor));
421
            this.log.info(`connected to ${executor.name}`);
422
        }
423

424
        await Promise.all(connectionPromises);
Deshui Yu's avatar
Deshui Yu committed
425
426
    }

427
428
    private async initRemoteMachineOnConnected(rmMeta: RemoteMachineMeta, executor: ShellExecutor): Promise<void> {
        // Create root working directory after executor is ready
429
430
        const nniRootDir: string = executor.joinPath(executor.getTempPath(), 'nni');
        await executor.createFolder(executor.getRemoteExperimentRootDir(getExperimentId()));
431

432
        // the directory to store temp scripts in remote machine
433
434
435
        const remoteGpuScriptCollectorDir: string = executor.getRemoteScriptsPath(getExperimentId());

        // clean up previous result.
436
        await executor.createFolder(remoteGpuScriptCollectorDir, true);
Junwei Sun's avatar
Junwei Sun committed
437
        await executor.allowPermission(true, nniRootDir);
438

Deshui Yu's avatar
Deshui Yu committed
439
        //Begin to execute gpu_metrics_collection scripts
440
        const script = executor.generateGpuStatsScript(getExperimentId());
441
        executor.executeScript(script, false, true);
442
443
444
        // the timer is trigger in 1 second, it causes multiple runs on server.
        // So reduce it's freqeunce, only allow one of it run.
        const collectingCount: boolean[] = [];
445

446
        const disposable: Rx.IDisposable = this.timer.subscribe(
447
            async () => {
448
449
450
451
452
453
454
455
456
                if (collectingCount.length == 0) {
                    collectingCount.push(true);
                    const cmdresult = await executor.readLastLines(executor.joinPath(remoteGpuScriptCollectorDir, 'gpu_metrics'));
                    if (cmdresult !== "") {
                        rmMeta.gpuSummary = <GPUSummary>JSON.parse(cmdresult);
                        if (rmMeta.gpuSummary.gpuCount === 0) {
                            this.log.warning(`No GPU found on remote machine ${rmMeta.ip}`);
                            this.timer.unsubscribe(disposable);
                        }
457
                    }
458
                    if (this.stopping) {
459
460
461
                        this.timer.unsubscribe(disposable);
                        this.log.debug(`Stopped GPU collector on ${rmMeta.ip}, since experiment is exiting.`);
                    }
462
                    collectingCount.pop();
Deshui Yu's avatar
Deshui Yu committed
463
464
465
466
467
468
                }
            }
        );
    }

    private async prepareTrialJob(trialJobId: string): Promise<boolean> {
chicm-ms's avatar
chicm-ms committed
469
        const deferred: Deferred<boolean> = new Deferred<boolean>();
Deshui Yu's avatar
Deshui Yu committed
470

471
        if (this.trialConfig === undefined) {
Deshui Yu's avatar
Deshui Yu committed
472
473
            throw new Error('trial config is not initialized');
        }
474
475
476
        if (this.gpuScheduler === undefined) {
            throw new Error('gpuScheduler is not initialized');
        }
Deshui Yu's avatar
Deshui Yu committed
477
478
479
480
        const trialJobDetail: RemoteMachineTrialJobDetail | undefined = this.trialJobsMap.get(trialJobId);
        if (trialJobDetail === undefined) {
            throw new NNIError(NNIErrorNames.INVALID_JOB_DETAIL, `Invalid job detail information for trial job ${trialJobId}`);
        }
481
482
483
        // If job is not WATIING, Don't prepare and resolve true immediately
        if (trialJobDetail.status !== 'WAITING') {
            deferred.resolve(true);
484

485
486
            return deferred.promise;
        }
487
        // get an executor from scheduler
488
        const rmScheduleResult: RemoteMachineScheduleResult = this.gpuScheduler.scheduleMachine(this.trialConfig.gpuNum, trialJobDetail);
Deshui Yu's avatar
Deshui Yu committed
489
        if (rmScheduleResult.resultType === ScheduleResultType.REQUIRE_EXCEED_TOTAL) {
chicm-ms's avatar
chicm-ms committed
490
            const errorMessage: string = `Required GPU number ${this.trialConfig.gpuNum} is too large, no machine can meet`;
Deshui Yu's avatar
Deshui Yu committed
491
492
493
            this.log.error(errorMessage);
            deferred.reject();
            throw new NNIError(NNIErrorNames.RESOURCE_NOT_AVAILABLE, errorMessage);
494
        } else if (rmScheduleResult.resultType === ScheduleResultType.SUCCEED
Deshui Yu's avatar
Deshui Yu committed
495
            && rmScheduleResult.scheduleInfo !== undefined) {
chicm-ms's avatar
chicm-ms committed
496
            const rmScheduleInfo: RemoteMachineScheduleInfo = rmScheduleResult.scheduleInfo;
SparkSnail's avatar
SparkSnail committed
497
498

            trialJobDetail.rmMeta = rmScheduleInfo.rmMeta;
499
500
501
502
            const copyExpCodeDirPromise = this.machineCopyExpCodeDirPromiseMap.get(trialJobDetail.rmMeta);
            if (copyExpCodeDirPromise !== undefined) {
                await copyExpCodeDirPromise;
            }
SparkSnail's avatar
SparkSnail committed
503

504
505
506
507
508
            this.allocateExecutorManagerForTrial(trialJobDetail);
            const executor = await this.getExecutor(trialJobDetail.id);

            trialJobDetail.workingDirectory = executor.joinPath(executor.getRemoteExperimentRootDir(getExperimentId()), 'trials', trialJobDetail.id);

Deshui Yu's avatar
Deshui Yu committed
509
            await this.launchTrialOnScheduledMachine(
510
                trialJobId, trialJobDetail.form, rmScheduleInfo);
Deshui Yu's avatar
Deshui Yu committed
511
512

            trialJobDetail.status = 'RUNNING';
513
            trialJobDetail.url = `file://${rmScheduleInfo.rmMeta.ip}:${trialJobDetail.workingDirectory}`;
514
            trialJobDetail.startTime = Date.now();
Deshui Yu's avatar
Deshui Yu committed
515

516
            this.trialJobsMap.set(trialJobId, trialJobDetail);
Deshui Yu's avatar
Deshui Yu committed
517
            deferred.resolve(true);
518
        } else if (rmScheduleResult.resultType === ScheduleResultType.TMP_NO_AVAILABLE_GPU) {
Deshui Yu's avatar
Deshui Yu committed
519
520
521
            this.log.info(`Right now no available GPU can be allocated for trial ${trialJobId}, will try to schedule later`);
            deferred.resolve(false);
        } else {
522
            deferred.reject(`Invalid schedule resutl type: ${rmScheduleResult.resultType}`);
Deshui Yu's avatar
Deshui Yu committed
523
524
525
526
527
        }

        return deferred.promise;
    }

528
    private async launchTrialOnScheduledMachine(trialJobId: string, form: TrialJobApplicationForm,
529
        rmScheduleInfo: RemoteMachineScheduleInfo): Promise<void> {
530
        if (this.trialConfig === undefined) {
Deshui Yu's avatar
Deshui Yu committed
531
532
            throw new Error('trial config is not initialized');
        }
chicm-ms's avatar
chicm-ms committed
533
        const cudaVisibleDevice: string = rmScheduleInfo.cudaVisibleDevice;
534
        const executor = await this.getExecutor(trialJobId);
535
536
537
538
539
        const trialJobDetail: RemoteMachineTrialJobDetail | undefined = this.trialJobsMap.get(trialJobId);
        if (trialJobDetail === undefined) {
            throw new Error(`Can not get trial job detail for job: ${trialJobId}`);
        }

Deshui Yu's avatar
Deshui Yu committed
540
541
        const trialLocalTempFolder: string = path.join(this.expRootDir, 'trials-local', trialJobId);

542
        await executor.createFolder(executor.joinPath(trialJobDetail.workingDirectory, '.nni'));
Deshui Yu's avatar
Deshui Yu committed
543
544
545

        // RemoteMachineRunShellFormat is the run shell format string,
        // See definition in remoteMachineData.ts
SparkSnail's avatar
SparkSnail committed
546

547
        let cudaVisible: string;
chicm-ms's avatar
chicm-ms committed
548
549
        // Set CUDA_VISIBLE_DEVICES environment variable based on cudaVisibleDevice
        // If no valid cudaVisibleDevice is defined, set CUDA_VISIBLE_DEVICES to empty string to hide GPU device
SparkSnail's avatar
SparkSnail committed
550
551
        // If gpuNum is undefined, will not set CUDA_VISIBLE_DEVICES in script
        if (this.trialConfig.gpuNum === undefined) {
552
            cudaVisible = ""
SparkSnail's avatar
SparkSnail committed
553
        } else {
chicm-ms's avatar
chicm-ms committed
554
            if (typeof cudaVisibleDevice === 'string' && cudaVisibleDevice.length > 0) {
555
                cudaVisible = `CUDA_VISIBLE_DEVICES=${cudaVisibleDevice}`;
SparkSnail's avatar
SparkSnail committed
556
            } else {
557
                cudaVisible = `CUDA_VISIBLE_DEVICES=" "`;
SparkSnail's avatar
SparkSnail committed
558
            }
SparkSnail's avatar
SparkSnail committed
559
        }
560
561
        const nniManagerIp: string = this.nniManagerIpConfig ? this.nniManagerIpConfig.nniManagerIp : getIPV4Address();
        if (this.remoteRestServerPort === undefined) {
SparkSnail's avatar
SparkSnail committed
562
563
564
            const restServer: RemoteMachineJobRestServer = component.get(RemoteMachineJobRestServer);
            this.remoteRestServerPort = restServer.clusterRestServerPort;
        }
565
        const version: string = this.versionCheck ? await getVersion() : '';
566
567
        const runScriptTrialContent: string = executor.generateStartScript(
            trialJobDetail.workingDirectory,
Deshui Yu's avatar
Deshui Yu committed
568
            trialJobId,
SparkSnail's avatar
SparkSnail committed
569
            getExperimentId(),
570
            trialJobDetail.form.sequenceId.toString(),
571
            this.isMultiPhase,
572
            this.trialConfig.command,
SparkSnail's avatar
SparkSnail committed
573
574
            nniManagerIp,
            this.remoteRestServerPort,
575
            version,
576
            this.logCollection, cudaVisible);
Deshui Yu's avatar
Deshui Yu committed
577
578

        //create tmp trial working folder locally.
579
        await execMkdir(path.join(trialLocalTempFolder, '.nni'));
580
581
582

        // Write install_nni.sh, it's not used in Windows platform.
        await fs.promises.writeFile(path.join(trialLocalTempFolder, executor.getScriptName("install_nni")), CONTAINER_INSTALL_NNI_SHELL_FORMAT, { encoding: 'utf8' });
583
        // Write file content ( run.sh and parameter.cfg ) to local tmp files
584
        await fs.promises.writeFile(path.join(trialLocalTempFolder, executor.getScriptName("run")), runScriptTrialContent, { encoding: 'utf8' });
chicm-ms's avatar
chicm-ms committed
585
        await this.writeParameterFile(trialJobId, form.hyperParameters);
Deshui Yu's avatar
Deshui Yu committed
586
        // Copy files in codeDir to remote working directory
587
        await executor.copyDirectoryToRemote(trialLocalTempFolder, trialJobDetail.workingDirectory);
Deshui Yu's avatar
Deshui Yu committed
588
        // Execute command in remote machine
589
        executor.executeScript(executor.joinPath(trialJobDetail.workingDirectory, executor.getScriptName("run")), true, true);
Deshui Yu's avatar
Deshui Yu committed
590
591
    }

592
    private async updateTrialJobStatus(trialJob: RemoteMachineTrialJobDetail, executor: ShellExecutor): Promise<TrialJobDetail> {
Deshui Yu's avatar
Deshui Yu committed
593
        const deferred: Deferred<TrialJobDetail> = new Deferred<TrialJobDetail>();
594
595
        const jobpidPath: string = this.getJobPidPath(executor, trialJob.id);
        const trialReturnCodeFilePath: string = executor.joinPath(executor.getRemoteExperimentRootDir(getExperimentId()), 'trials', trialJob.id, '.nni', 'code');
596
        /* eslint-disable require-atomic-updates */
Deshui Yu's avatar
Deshui Yu committed
597
        try {
598
            const isAlive = await executor.isProcessAlive(jobpidPath);
Deshui Yu's avatar
Deshui Yu committed
599
            // if the process of jobpid is not alive any more
600
601
602
603
            if (!isAlive) {
                const trialReturnCode: string = await executor.getRemoteFileContent(trialReturnCodeFilePath);
                this.log.debug(`trailjob ${trialJob.id} return code: ${trialReturnCode}`);
                const match: RegExpMatchArray | null = trialReturnCode.trim()
604
                    .match(/^-?(\d+)\s+(\d+)$/);
605
                if (match !== null) {
Deshui Yu's avatar
Deshui Yu committed
606
607
608
609
610
                    const { 1: code, 2: timestamp } = match;
                    // Update trial job's status based on result code
                    if (parseInt(code, 10) === 0) {
                        trialJob.status = 'SUCCEEDED';
                    } else {
611
612
613
614
615
616
                        // isEarlyStopped is never set, mean it's not cancelled by NNI, so if the process's exit code >0, mark it as FAILED
                        if (trialJob.isEarlyStopped === undefined) {
                            trialJob.status = 'FAILED';
                        } else {
                            trialJob.status = getJobCancelStatus(trialJob.isEarlyStopped);
                        }
Deshui Yu's avatar
Deshui Yu committed
617
                    }
618
                    trialJob.endTime = parseInt(timestamp, 10);
619
                    this.releaseTrialResource(trialJob);
Deshui Yu's avatar
Deshui Yu committed
620
                }
chicm-ms's avatar
chicm-ms committed
621
                this.log.debug(`trailJob status update: ${trialJob.id}, ${trialJob.status}`);
Deshui Yu's avatar
Deshui Yu committed
622
623
624
            }
            deferred.resolve(trialJob);
        } catch (error) {
625
            this.log.debug(`(Ignorable mostly)Update job status exception, error is ${error.message}`);
Deshui Yu's avatar
Deshui Yu committed
626
627
628
629
630
631
632
            if (error instanceof NNIError && error.name === NNIErrorNames.NOT_FOUND) {
                deferred.resolve(trialJob);
            } else {
                trialJob.status = 'UNKNOWN';
                deferred.resolve(trialJob);
            }
        }
633
        /* eslint-enable require-atomic-updates */
Deshui Yu's avatar
Deshui Yu committed
634
635
636
        return deferred.promise;
    }

chicm-ms's avatar
chicm-ms committed
637
    public get MetricsEmitter(): EventEmitter {
SparkSnail's avatar
SparkSnail committed
638
639
640
        return this.metricsEmitter;
    }

641
    private getJobPidPath(executor: ShellExecutor, jobId: string): string {
Deshui Yu's avatar
Deshui Yu committed
642
643
644
645
646
        const trialJobDetail: RemoteMachineTrialJobDetail | undefined = this.trialJobsMap.get(jobId);
        if (trialJobDetail === undefined) {
            throw new NNIError(NNIErrorNames.INVALID_JOB_DETAIL, `Invalid job detail information for trial job ${jobId}`);
        }

647
        return executor.joinPath(trialJobDetail.workingDirectory, '.nni', 'jobpid');
Deshui Yu's avatar
Deshui Yu committed
648
    }
chicm-ms's avatar
chicm-ms committed
649

chicm-ms's avatar
chicm-ms committed
650
    private async writeParameterFile(trialJobId: string, hyperParameters: HyperParameters): Promise<void> {
651
        const executor = await this.getExecutor(trialJobId);
chicm-ms's avatar
chicm-ms committed
652

653
        const trialWorkingFolder: string = executor.joinPath(executor.getRemoteExperimentRootDir(getExperimentId()), 'trials', trialJobId);
chicm-ms's avatar
chicm-ms committed
654
655
        const trialLocalTempFolder: string = path.join(this.expRootDir, 'trials-local', trialJobId);

656
        const fileName: string = generateParamFileName(hyperParameters);
chicm-ms's avatar
chicm-ms committed
657
658
659
        const localFilepath: string = path.join(trialLocalTempFolder, fileName);
        await fs.promises.writeFile(localFilepath, hyperParameters.value, { encoding: 'utf8' });

660
        await executor.copyFileToRemote(localFilepath, executor.joinPath(trialWorkingFolder, fileName));
chicm-ms's avatar
chicm-ms committed
661
    }
Deshui Yu's avatar
Deshui Yu committed
662
663
664
}

export { RemoteMachineTrainingService };