remoteMachineTrainingService.ts 30.5 KB
Newer Older
liuzhe-lz's avatar
liuzhe-lz committed
1
2
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.
Deshui Yu's avatar
Deshui Yu committed
3
4
5

'use strict';

6
import * as assert from 'assert';
Deshui Yu's avatar
Deshui Yu committed
7
8
9
import { EventEmitter } from 'events';
import * as fs from 'fs';
import * as path from 'path';
10
import { ShellExecutor } from 'training_service/remote_machine/shellExecutor';
Deshui Yu's avatar
Deshui Yu committed
11
12
import { Deferred } from 'ts-deferred';
import * as component from '../../common/component';
13
import { NNIError, NNIErrorNames, MethodNotImplementedError } from '../../common/errors';
14
import { getExperimentId } from '../../common/experimentStartupInfo';
Deshui Yu's avatar
Deshui Yu committed
15
16
17
import { getLogger, Logger } from '../../common/log';
import { ObservableTimer } from '../../common/observableTimer';
import {
18
    HyperParameters, NNIManagerIpConfig, TrainingService, TrialJobApplicationForm,
19
    TrialJobDetail, TrialJobMetric, LogType
Deshui Yu's avatar
Deshui Yu committed
20
} from '../../common/trainingService';
21
import {
22
23
    delay, generateParamFileName, getExperimentRootDir, getIPV4Address, getJobCancelStatus,
    getVersion, uniqueString
24
25
} from '../../common/utils';
import { CONTAINER_INSTALL_NNI_SHELL_FORMAT } from '../common/containerJobData';
26
import { GPUSummary, ScheduleResultType } from '../common/gpuData';
27
28
import { TrialConfig } from '../common/trialConfig';
import { TrialConfigMetadataKey } from '../common/trialConfigMetadataKey';
29
import { execMkdir, validateCodeDir } from '../common/util';
Deshui Yu's avatar
Deshui Yu committed
30
31
import { GPUScheduler } from './gpuScheduler';
import {
32
33
    ExecutorManager, RemoteMachineMeta,
    RemoteMachineScheduleInfo, RemoteMachineScheduleResult, RemoteMachineTrialJobDetail
Deshui Yu's avatar
Deshui Yu committed
34
} from './remoteMachineData';
SparkSnail's avatar
SparkSnail committed
35
import { RemoteMachineJobRestServer } from './remoteMachineJobRestServer';
Deshui Yu's avatar
Deshui Yu committed
36
37
38
39

/**
 * Training Service implementation for Remote Machine (Linux)
 */
SparkSnail's avatar
SparkSnail committed
40
@component.Singleton
Deshui Yu's avatar
Deshui Yu committed
41
class RemoteMachineTrainingService implements TrainingService {
42
    private readonly initExecutorId = "initConnection";
43
    private readonly machineExecutorManagerMap: Map<RemoteMachineMeta, ExecutorManager>; //machine excutor map
44
    private readonly machineCopyExpCodeDirPromiseMap: Map<RemoteMachineMeta, Promise<void>>;
45
    private readonly trialExecutorManagerMap: Map<string, ExecutorManager>; //trial excutor map
46
47
    private readonly trialJobsMap: Map<string, RemoteMachineTrialJobDetail>;
    private readonly expRootDir: string;
48
    private trialConfig: TrialConfig | undefined;
49
    private gpuScheduler?: GPUScheduler;
50
51
    private readonly jobQueue: string[];
    private readonly timer: ObservableTimer;
Deshui Yu's avatar
Deshui Yu committed
52
    private stopping: boolean = false;
53
54
    private readonly metricsEmitter: EventEmitter;
    private readonly log: Logger;
55
    private isMultiPhase: boolean = false;
SparkSnail's avatar
SparkSnail committed
56
57
    private remoteRestServerPort?: number;
    private nniManagerIpConfig?: NNIManagerIpConfig;
58
    private versionCheck: boolean = true;
SparkSnail's avatar
SparkSnail committed
59
    private logCollection: string;
60
    private sshConnectionPromises: any[];
Deshui Yu's avatar
Deshui Yu committed
61
62
63
64

    constructor(@component.Inject timer: ObservableTimer) {
        this.metricsEmitter = new EventEmitter();
        this.trialJobsMap = new Map<string, RemoteMachineTrialJobDetail>();
65
        this.trialExecutorManagerMap = new Map<string, ExecutorManager>();
66
        this.machineCopyExpCodeDirPromiseMap = new Map<RemoteMachineMeta, Promise<void>>();
67
        this.machineExecutorManagerMap = new Map<RemoteMachineMeta, ExecutorManager>();
Deshui Yu's avatar
Deshui Yu committed
68
        this.jobQueue = [];
69
        this.sshConnectionPromises = [];
Deshui Yu's avatar
Deshui Yu committed
70
71
72
        this.expRootDir = getExperimentRootDir();
        this.timer = timer;
        this.log = getLogger();
SparkSnail's avatar
SparkSnail committed
73
        this.logCollection = 'none';
chicm-ms's avatar
chicm-ms committed
74
        this.log.info('Construct remote machine training service.');
Deshui Yu's avatar
Deshui Yu committed
75
76
77
78
79
80
    }

    /**
     * Loop to launch trial jobs and collect trial metrics
     */
    public async run(): Promise<void> {
SparkSnail's avatar
SparkSnail committed
81
82
        const restServer: RemoteMachineJobRestServer = component.get(RemoteMachineJobRestServer);
        await restServer.start();
83
        restServer.setEnableVersionCheck = this.versionCheck;
chicm-ms's avatar
chicm-ms committed
84
        this.log.info('Run remote machine training service.');
85
86
87
88
89
        if (this.sshConnectionPromises.length > 0) {
            await Promise.all(this.sshConnectionPromises);
            this.log.info('ssh connection initialized!');
            // set sshConnectionPromises to [] to avoid log information duplicated
            this.sshConnectionPromises = [];
90
91
            // initialize gpuScheduler
            this.gpuScheduler = new GPUScheduler(this.machineExecutorManagerMap);
SparkSnail's avatar
SparkSnail committed
92
93
94
95
96
97
98
99
100
101
102
103
104
            if (this.trialConfig ===  undefined) {
                throw new Error("trial config not initialized!");
            }
            // Copy codeDir to remote machine
            for (const [rmMeta, executorManager] of this.machineExecutorManagerMap.entries()) {
                const executor: ShellExecutor = await executorManager.getExecutor(this.initExecutorId);
                if (executor !== undefined) {
                    this.machineCopyExpCodeDirPromiseMap.set(
                        rmMeta,
                        executor.copyDirectoryToRemote(this.trialConfig.codeDir, executor.getRemoteCodePath(getExperimentId()))
                    );
                }
            }
105
        }
Deshui Yu's avatar
Deshui Yu committed
106
107
        while (!this.stopping) {
            while (this.jobQueue.length > 0) {
SparkSnail's avatar
SparkSnail committed
108
                this.updateGpuReservation();
Deshui Yu's avatar
Deshui Yu committed
109
                const trialJobId: string = this.jobQueue[0];
chicm-ms's avatar
chicm-ms committed
110
                const prepareResult: boolean = await this.prepareTrialJob(trialJobId);
Deshui Yu's avatar
Deshui Yu committed
111
112
113
114
                if (prepareResult) {
                    // Remove trial job with trialJobId from job queue
                    this.jobQueue.shift();
                } else {
115
                    // Break the while loop since no GPU resource is available right now,
Deshui Yu's avatar
Deshui Yu committed
116
117
118
                    // Wait to schedule job in next time iteration
                    break;
                }
119
            }
120
            if (restServer.getErrorMessage !== undefined) {
121
                this.stopping = true;
122
                throw new Error(restServer.getErrorMessage);
123
            }
Deshui Yu's avatar
Deshui Yu committed
124
125
            await delay(3000);
        }
126
        this.log.info('RemoteMachineTrainingService run loop exited.');
Deshui Yu's avatar
Deshui Yu committed
127
    }
128

SparkSnail's avatar
SparkSnail committed
129
    /**
130
     * give trial an executor
131
     * @param trial remote machine trial job detail
SparkSnail's avatar
SparkSnail committed
132
     */
133
    public allocateExecutorManagerForTrial(trial: RemoteMachineTrialJobDetail): void {
134
        if (trial.rmMeta === undefined) {
SparkSnail's avatar
SparkSnail committed
135
136
            throw new Error(`rmMeta not set in trial ${trial.id}`);
        }
137
138
139
        const executorManager: ExecutorManager | undefined = this.machineExecutorManagerMap.get(trial.rmMeta);
        if (executorManager === undefined) {
            throw new Error(`executorManager not initialized`);
SparkSnail's avatar
SparkSnail committed
140
        }
141
        this.trialExecutorManagerMap.set(trial.id, executorManager);
SparkSnail's avatar
SparkSnail committed
142
    }
143

SparkSnail's avatar
SparkSnail committed
144
145
    /**
     * If a trial is finished, release the connection resource
146
     * @param trial remote machine trial job detail
SparkSnail's avatar
SparkSnail committed
147
     */
148
    public releaseTrialResource(trial: RemoteMachineTrialJobDetail): void {
149
        if (trial.rmMeta === undefined) {
SparkSnail's avatar
SparkSnail committed
150
151
            throw new Error(`rmMeta not set in trial ${trial.id}`);
        }
152
        const executorManager = this.trialExecutorManagerMap.get(trial.id);
153
        if (executorManager === undefined) {
154
            throw new Error(`ExecutorManager is not assigned for trial ${trial.id}`);
SparkSnail's avatar
SparkSnail committed
155
        }
156
157
        // Note, it still keep reference in trialExecutorManagerMap, as there may be following requests from nni manager.
        executorManager.releaseExecutor(trial.id);
SparkSnail's avatar
SparkSnail committed
158
    }
Deshui Yu's avatar
Deshui Yu committed
159
160
161
162

    /**
     * List submitted trial jobs
     */
163
    public async listTrialJobs(): Promise<TrialJobDetail[]> {
Deshui Yu's avatar
Deshui Yu committed
164
165
166
        const jobs: TrialJobDetail[] = [];
        const deferred: Deferred<TrialJobDetail[]> = new Deferred<TrialJobDetail[]>();

167
        for (const [key,] of this.trialJobsMap) {
168
            jobs.push(await this.getTrialJob(key));
169
        }
Deshui Yu's avatar
Deshui Yu committed
170
171
172
173
174
175
176
177
178
179
180
        deferred.resolve(jobs);

        return deferred.promise;
    }

    /**
     * Get trial job detail information
     * @param trialJobId ID of trial job
     */
    public async getTrialJob(trialJobId: string): Promise<TrialJobDetail> {
        const trialJob: RemoteMachineTrialJobDetail | undefined = this.trialJobsMap.get(trialJobId);
181
        if (trialJob === undefined) {
Deshui Yu's avatar
Deshui Yu committed
182
183
184
185
            throw new NNIError(NNIErrorNames.NOT_FOUND, `trial job id ${trialJobId} not found`);
        }
        //TO DO: add another job status, and design new job status change logic
        if (trialJob.status === 'RUNNING' || trialJob.status === 'UNKNOWN') {
186
            // Get executor where the job is running
Deshui Yu's avatar
Deshui Yu committed
187
188
189
            if (trialJob.rmMeta === undefined) {
                throw new Error(`rmMeta not set for submitted job ${trialJobId}`);
            }
190
            const executor = await this.getExecutor(trialJob.id);
Deshui Yu's avatar
Deshui Yu committed
191

192
            return this.updateTrialJobStatus(trialJob, executor);
Deshui Yu's avatar
Deshui Yu committed
193
194
195
196
197
        } else {
            return trialJob;
        }
    }

198
199
200
201
202
203
204
205
206
    /**
     * Get trial job log
     * @param _trialJobId ID of trial job
     * @param _logType 'TRIAL_LOG' | 'TRIAL_STDERR'
     */
    public async getTrialLog(_trialJobId: string, _logType: LogType): Promise<string> {
        throw new MethodNotImplementedError();
    }

Deshui Yu's avatar
Deshui Yu committed
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
    /**
     * Add job metrics listener
     * @param listener callback listener
     */
    public addTrialJobMetricListener(listener: (metric: TrialJobMetric) => void): void {
        this.metricsEmitter.on('metric', listener);
    }

    /**
     * Remove job metrics listener
     * @param listener callback listener
     */
    public removeTrialJobMetricListener(listener: (metric: TrialJobMetric) => void): void {
        this.metricsEmitter.off('metric', listener);
    }

    /**
     * Submit trial job
     * @param form trial job description form
     */
227
    public async submitTrialJob(form: TrialJobApplicationForm): Promise<TrialJobDetail> {
228
        if (this.trialConfig === undefined) {
Deshui Yu's avatar
Deshui Yu committed
229
230
231
            throw new Error('trial config is not initialized');
        }

232
233
        // Generate trial job id(random)
        const trialJobId: string = uniqueString(5);
Deshui Yu's avatar
Deshui Yu committed
234

235
236
237
238
        const trialJobDetail: RemoteMachineTrialJobDetail = new RemoteMachineTrialJobDetail(
            trialJobId,
            'WAITING',
            Date.now(),
239
            "unset",
240
241
242
243
            form
        );
        this.jobQueue.push(trialJobId);
        this.trialJobsMap.set(trialJobId, trialJobDetail);
244

245
        return Promise.resolve(trialJobDetail);
Deshui Yu's avatar
Deshui Yu committed
246
247
    }

248
249
250
251
252
    /**
     * Update trial job for multi-phase
     * @param trialJobId trial job id
     * @param form job application form
     */
253
    public async updateTrialJob(trialJobId: string, form: TrialJobApplicationForm): Promise<TrialJobDetail> {
chicm-ms's avatar
chicm-ms committed
254
255
256
257
        const trialJobDetail: undefined | TrialJobDetail = this.trialJobsMap.get(trialJobId);
        if (trialJobDetail === undefined) {
            throw new Error(`updateTrialJob failed: ${trialJobId} not found`);
        }
chicm-ms's avatar
chicm-ms committed
258
        await this.writeParameterFile(trialJobId, form.hyperParameters);
chicm-ms's avatar
chicm-ms committed
259
260

        return trialJobDetail;
261
    }
262

263
264
265
266
    /**
     * Is multiphase job supported in current training service
     */
    public get isMultiPhaseJobSupported(): boolean {
267
        return true;
268
269
    }

Deshui Yu's avatar
Deshui Yu committed
270
271
272
273
    /**
     * Cancel trial job
     * @param trialJobId ID of trial job
     */
QuanluZhang's avatar
QuanluZhang committed
274
    public async cancelTrialJob(trialJobId: string, isEarlyStopped: boolean = false): Promise<void> {
Deshui Yu's avatar
Deshui Yu committed
275
        const trialJob: RemoteMachineTrialJobDetail | undefined = this.trialJobsMap.get(trialJobId);
276
        if (trialJob === undefined) {
Deshui Yu's avatar
Deshui Yu committed
277
278
279
280
            throw new Error(`trial job id ${trialJobId} not found`);
        }

        // Remove the job with trialJobId from job queue
chicm-ms's avatar
chicm-ms committed
281
        const index: number = this.jobQueue.indexOf(trialJobId);
282
        if (index >= 0) {
Deshui Yu's avatar
Deshui Yu committed
283
284
285
            this.jobQueue.splice(index, 1);
        }

286
        // Get executor where the job is running
Deshui Yu's avatar
Deshui Yu committed
287
288
        if (trialJob.rmMeta !== undefined) {
            // If the trial job is already scheduled, check its status and kill the trial process in remote machine
289
            const executor = await this.getExecutor(trialJob.id);
Deshui Yu's avatar
Deshui Yu committed
290

chicm-ms's avatar
chicm-ms committed
291
292
            if (trialJob.status === 'UNKNOWN') {
                trialJob.status = 'USER_CANCELED';
293
                this.releaseTrialResource(trialJob);
chicm-ms's avatar
chicm-ms committed
294
295
296
                return
            }

297
            const jobpidPath: string = this.getJobPidPath(executor, trialJob.id);
Deshui Yu's avatar
Deshui Yu committed
298
            try {
299
300
                // Mark the toEarlyStop tag here
                trialJob.isEarlyStopped = isEarlyStopped;
301
                await executor.killChildProcesses(jobpidPath);
302
                this.releaseTrialResource(trialJob);
Deshui Yu's avatar
Deshui Yu committed
303
304
            } catch (error) {
                // Not handle the error since pkill failed will not impact trial job's current status
305
                this.log.error(`remoteTrainingService.cancelTrialJob: ${error}`);
Deshui Yu's avatar
Deshui Yu committed
306
307
308
            }
        } else {
            // Job is not scheduled yet, set status to 'USER_CANCELLED' directly
QuanluZhang's avatar
QuanluZhang committed
309
310
            assert(isEarlyStopped === false, 'isEarlyStopped is not supposed to be true here.');
            trialJob.status = getJobCancelStatus(isEarlyStopped);
Deshui Yu's avatar
Deshui Yu committed
311
312
313
314
315
316
        }
    }

    /**
     * Set culster metadata
     * @param key metadata key
317
     * //1. MACHINE_LIST -- create executor of machine list
Deshui Yu's avatar
Deshui Yu committed
318
319
320
321
322
     * //2. TRIAL_CONFIG -- trial configuration
     * @param value metadata value
     */
    public async setClusterMetadata(key: string, value: string): Promise<void> {
        switch (key) {
SparkSnail's avatar
SparkSnail committed
323
324
325
            case TrialConfigMetadataKey.NNI_MANAGER_IP:
                this.nniManagerIpConfig = <NNIManagerIpConfig>JSON.parse(value);
                break;
326
            case TrialConfigMetadataKey.MACHINE_LIST:
Deshui Yu's avatar
Deshui Yu committed
327
328
                await this.setupConnections(value);
                break;
chicm-ms's avatar
chicm-ms committed
329
            case TrialConfigMetadataKey.TRIAL_CONFIG: {
330
                const remoteMachineTrailConfig: TrialConfig = <TrialConfig>JSON.parse(value);
Deshui Yu's avatar
Deshui Yu committed
331
                // Parse trial config failed, throw Error
332
                if (remoteMachineTrailConfig === undefined) {
Deshui Yu's avatar
Deshui Yu committed
333
334
335
                    throw new Error('trial config parsed failed');
                }
                // codeDir is not a valid directory, throw Error
336
                if (!fs.lstatSync(remoteMachineTrailConfig.codeDir)
337
                    .isDirectory()) {
Deshui Yu's avatar
Deshui Yu committed
338
339
                    throw new Error(`codeDir ${remoteMachineTrailConfig.codeDir} is not a directory`);
                }
340
341

                try {
342
                    // Validate to make sure codeDir doesn't have too many files
343
                    await validateCodeDir(remoteMachineTrailConfig.codeDir);
344
                } catch (error) {
345
                    this.log.error(error);
346
                    return Promise.reject(new Error(error));
347
348
                }

Deshui Yu's avatar
Deshui Yu committed
349
350
                this.trialConfig = remoteMachineTrailConfig;
                break;
chicm-ms's avatar
chicm-ms committed
351
            }
352
353
354
            case TrialConfigMetadataKey.MULTI_PHASE:
                this.isMultiPhase = (value === 'true' || value === 'True');
                break;
355
356
357
            case TrialConfigMetadataKey.VERSION_CHECK:
                this.versionCheck = (value === 'true' || value === 'True');
                break;
SparkSnail's avatar
SparkSnail committed
358
359
360
            case TrialConfigMetadataKey.LOG_COLLECTION:
                this.logCollection = value;
                break;
361
362
363
364
            case TrialConfigMetadataKey.REMOTE_CONFIG:
                // Add remote_config in remoteEnvironmentService to set reuse mode, 
                // this config need to be catched here, otherwise will throw Unknown key exception here
                break;
Deshui Yu's avatar
Deshui Yu committed
365
366
367
368
369
370
371
372
373
374
            default:
                //Reject for unknown keys
                throw new Error(`Uknown key: ${key}`);
        }
    }

    /**
     * Get culster metadata
     * @param key metadata key
     */
375
    public async getClusterMetadata(_key: string): Promise<string> {
376
        return "";
Deshui Yu's avatar
Deshui Yu committed
377
    }
378

SparkSnail's avatar
SparkSnail committed
379
    /**
380
     * cleanup() has a time out of 10s to clean remote connections
SparkSnail's avatar
SparkSnail committed
381
382
     */
    public async cleanUp(): Promise<void> {
chicm-ms's avatar
chicm-ms committed
383
        this.log.info('Stopping remote machine training service...');
Deshui Yu's avatar
Deshui Yu committed
384
        this.stopping = true;
385
386
387
388
389
390
391
392
393
        await this.cleanupConnections();
    }

    private async getExecutor(trialId: string): Promise<ShellExecutor> {
        const executorManager = this.trialExecutorManagerMap.get(trialId);
        if (executorManager === undefined) {
            throw new Error(`ExecutorManager is not assigned for trial ${trialId}`);
        }
        return await executorManager.getExecutor(trialId);
SparkSnail's avatar
SparkSnail committed
394
    }
395

396
397
398
399
    /**
     * remove gpu reversion when job is not running
     */
    private updateGpuReservation(): void {
400
401
402
403
404
        if (this.gpuScheduler) {
            for (const [key, value] of this.trialJobsMap) {
                if (!['WAITING', 'RUNNING'].includes(value.status)) {
                    this.gpuScheduler.removeGpuReservation(key, this.trialJobsMap);
                }
405
406
407
408
            }
        }
    }

SparkSnail's avatar
SparkSnail committed
409
410
411
412
    /**
     * stop gpu_metric_collector process in remote machine and remove unused scripts
     */
    private async cleanupConnections(): Promise<void> {
413
        try {
414
415
            for (const executorManager of this.machineExecutorManagerMap.values()) {
                const executor = await executorManager.getExecutor(this.initExecutorId);
416
                if (executor !== undefined) {
417
418
                    this.log.info(`killing gpu metric collector on ${executor.name}`);
                    const gpuJobPidPath: string = executor.joinPath(executor.getRemoteScriptsPath(getExperimentId()), 'pid');
419
                    await executor.killChildProcesses(gpuJobPidPath, true);
SparkSnail's avatar
SparkSnail committed
420
                }
421
                executorManager.releaseAllExecutor();
SparkSnail's avatar
SparkSnail committed
422
            }
423
        } catch (error) {
SparkSnail's avatar
SparkSnail committed
424
            //ignore error, this function is called to cleanup remote connections when experiment is stopping
425
            this.log.error(`Cleanup connection exception, error is ${error}`);
SparkSnail's avatar
SparkSnail committed
426
        }
427
428
    }

Deshui Yu's avatar
Deshui Yu committed
429
    private async setupConnections(machineList: string): Promise<void> {
chicm-ms's avatar
chicm-ms committed
430
        this.log.debug(`Connecting to remote machines: ${machineList}`);
Deshui Yu's avatar
Deshui Yu committed
431
432
        //TO DO: verify if value's format is wrong, and json parse failed, how to handle error
        const rmMetaList: RemoteMachineMeta[] = <RemoteMachineMeta[]>JSON.parse(machineList);
SparkSnail's avatar
SparkSnail committed
433

434
        for (const rmMeta of rmMetaList) {
435
            this.sshConnectionPromises.push(this.initRemoteMachineOnConnected(rmMeta));
436
        }
Deshui Yu's avatar
Deshui Yu committed
437
438
    }

439
440
441
442
443
444
445
446
447
    private async initRemoteMachineOnConnected(rmMeta: RemoteMachineMeta): Promise<void> {
        rmMeta.occupiedGpuIndexMap = new Map<number, number>();
        const executorManager: ExecutorManager = new ExecutorManager(rmMeta);
        this.log.info(`connecting to ${rmMeta.username}@${rmMeta.ip}:${rmMeta.port}`);
        const executor: ShellExecutor = await executorManager.getExecutor(this.initExecutorId);
        this.log.debug(`reached ${executor.name}`);
        this.machineExecutorManagerMap.set(rmMeta, executorManager);
        this.log.debug(`initializing ${executor.name}`);

448
        // Create root working directory after executor is ready
449
450
        const nniRootDir: string = executor.joinPath(executor.getTempPath(), 'nni');
        await executor.createFolder(executor.getRemoteExperimentRootDir(getExperimentId()));
451

452
        // the directory to store temp scripts in remote machine
453
454
455
        const remoteGpuScriptCollectorDir: string = executor.getRemoteScriptsPath(getExperimentId());

        // clean up previous result.
456
        await executor.createFolder(remoteGpuScriptCollectorDir, true);
Junwei Sun's avatar
Junwei Sun committed
457
        await executor.allowPermission(true, nniRootDir);
458

Deshui Yu's avatar
Deshui Yu committed
459
        //Begin to execute gpu_metrics_collection scripts
460
        const script = executor.generateGpuStatsScript(getExperimentId());
461
        executor.executeScript(script, false, true);
462
463
464
        // the timer is trigger in 1 second, it causes multiple runs on server.
        // So reduce it's freqeunce, only allow one of it run.
        const collectingCount: boolean[] = [];
465

466
        const disposable: Rx.IDisposable = this.timer.subscribe(
467
            async () => {
468
469
470
471
472
473
474
475
476
                if (collectingCount.length == 0) {
                    collectingCount.push(true);
                    const cmdresult = await executor.readLastLines(executor.joinPath(remoteGpuScriptCollectorDir, 'gpu_metrics'));
                    if (cmdresult !== "") {
                        rmMeta.gpuSummary = <GPUSummary>JSON.parse(cmdresult);
                        if (rmMeta.gpuSummary.gpuCount === 0) {
                            this.log.warning(`No GPU found on remote machine ${rmMeta.ip}`);
                            this.timer.unsubscribe(disposable);
                        }
477
                    }
478
                    if (this.stopping) {
479
480
481
                        this.timer.unsubscribe(disposable);
                        this.log.debug(`Stopped GPU collector on ${rmMeta.ip}, since experiment is exiting.`);
                    }
482
                    collectingCount.pop();
Deshui Yu's avatar
Deshui Yu committed
483
484
485
486
487
488
                }
            }
        );
    }

    private async prepareTrialJob(trialJobId: string): Promise<boolean> {
chicm-ms's avatar
chicm-ms committed
489
        const deferred: Deferred<boolean> = new Deferred<boolean>();
Deshui Yu's avatar
Deshui Yu committed
490

491
        if (this.trialConfig === undefined) {
Deshui Yu's avatar
Deshui Yu committed
492
493
            throw new Error('trial config is not initialized');
        }
494
495
496
        if (this.gpuScheduler === undefined) {
            throw new Error('gpuScheduler is not initialized');
        }
Deshui Yu's avatar
Deshui Yu committed
497
498
499
500
        const trialJobDetail: RemoteMachineTrialJobDetail | undefined = this.trialJobsMap.get(trialJobId);
        if (trialJobDetail === undefined) {
            throw new NNIError(NNIErrorNames.INVALID_JOB_DETAIL, `Invalid job detail information for trial job ${trialJobId}`);
        }
501
502
503
        // If job is not WATIING, Don't prepare and resolve true immediately
        if (trialJobDetail.status !== 'WAITING') {
            deferred.resolve(true);
504

505
506
            return deferred.promise;
        }
507
        // get an executor from scheduler
508
        const rmScheduleResult: RemoteMachineScheduleResult = this.gpuScheduler.scheduleMachine(this.trialConfig.gpuNum, trialJobDetail);
Deshui Yu's avatar
Deshui Yu committed
509
        if (rmScheduleResult.resultType === ScheduleResultType.REQUIRE_EXCEED_TOTAL) {
chicm-ms's avatar
chicm-ms committed
510
            const errorMessage: string = `Required GPU number ${this.trialConfig.gpuNum} is too large, no machine can meet`;
Deshui Yu's avatar
Deshui Yu committed
511
512
513
            this.log.error(errorMessage);
            deferred.reject();
            throw new NNIError(NNIErrorNames.RESOURCE_NOT_AVAILABLE, errorMessage);
514
        } else if (rmScheduleResult.resultType === ScheduleResultType.SUCCEED
Deshui Yu's avatar
Deshui Yu committed
515
            && rmScheduleResult.scheduleInfo !== undefined) {
chicm-ms's avatar
chicm-ms committed
516
            const rmScheduleInfo: RemoteMachineScheduleInfo = rmScheduleResult.scheduleInfo;
SparkSnail's avatar
SparkSnail committed
517
518

            trialJobDetail.rmMeta = rmScheduleInfo.rmMeta;
519
520
521
522
            const copyExpCodeDirPromise = this.machineCopyExpCodeDirPromiseMap.get(trialJobDetail.rmMeta);
            if (copyExpCodeDirPromise !== undefined) {
                await copyExpCodeDirPromise;
            }
SparkSnail's avatar
SparkSnail committed
523

524
525
526
527
528
            this.allocateExecutorManagerForTrial(trialJobDetail);
            const executor = await this.getExecutor(trialJobDetail.id);

            trialJobDetail.workingDirectory = executor.joinPath(executor.getRemoteExperimentRootDir(getExperimentId()), 'trials', trialJobDetail.id);

Deshui Yu's avatar
Deshui Yu committed
529
            await this.launchTrialOnScheduledMachine(
530
                trialJobId, trialJobDetail.form, rmScheduleInfo);
Deshui Yu's avatar
Deshui Yu committed
531
532

            trialJobDetail.status = 'RUNNING';
533
            trialJobDetail.url = `file://${rmScheduleInfo.rmMeta.ip}:${trialJobDetail.workingDirectory}`;
534
            trialJobDetail.startTime = Date.now();
Deshui Yu's avatar
Deshui Yu committed
535

536
            this.trialJobsMap.set(trialJobId, trialJobDetail);
Deshui Yu's avatar
Deshui Yu committed
537
            deferred.resolve(true);
538
        } else if (rmScheduleResult.resultType === ScheduleResultType.TMP_NO_AVAILABLE_GPU) {
Deshui Yu's avatar
Deshui Yu committed
539
540
541
            this.log.info(`Right now no available GPU can be allocated for trial ${trialJobId}, will try to schedule later`);
            deferred.resolve(false);
        } else {
542
            deferred.reject(`Invalid schedule resutl type: ${rmScheduleResult.resultType}`);
Deshui Yu's avatar
Deshui Yu committed
543
544
545
546
547
        }

        return deferred.promise;
    }

548
    private async launchTrialOnScheduledMachine(trialJobId: string, form: TrialJobApplicationForm,
549
        rmScheduleInfo: RemoteMachineScheduleInfo): Promise<void> {
550
        if (this.trialConfig === undefined) {
Deshui Yu's avatar
Deshui Yu committed
551
552
            throw new Error('trial config is not initialized');
        }
chicm-ms's avatar
chicm-ms committed
553
        const cudaVisibleDevice: string = rmScheduleInfo.cudaVisibleDevice;
554
        const executor = await this.getExecutor(trialJobId);
555
556
557
558
559
        const trialJobDetail: RemoteMachineTrialJobDetail | undefined = this.trialJobsMap.get(trialJobId);
        if (trialJobDetail === undefined) {
            throw new Error(`Can not get trial job detail for job: ${trialJobId}`);
        }

Deshui Yu's avatar
Deshui Yu committed
560
561
        const trialLocalTempFolder: string = path.join(this.expRootDir, 'trials-local', trialJobId);

562
        await executor.createFolder(executor.joinPath(trialJobDetail.workingDirectory, '.nni'));
Deshui Yu's avatar
Deshui Yu committed
563
564
565

        // RemoteMachineRunShellFormat is the run shell format string,
        // See definition in remoteMachineData.ts
SparkSnail's avatar
SparkSnail committed
566

567
        let cudaVisible: string;
chicm-ms's avatar
chicm-ms committed
568
569
        // Set CUDA_VISIBLE_DEVICES environment variable based on cudaVisibleDevice
        // If no valid cudaVisibleDevice is defined, set CUDA_VISIBLE_DEVICES to empty string to hide GPU device
SparkSnail's avatar
SparkSnail committed
570
571
        // If gpuNum is undefined, will not set CUDA_VISIBLE_DEVICES in script
        if (this.trialConfig.gpuNum === undefined) {
572
            cudaVisible = ""
SparkSnail's avatar
SparkSnail committed
573
        } else {
chicm-ms's avatar
chicm-ms committed
574
            if (typeof cudaVisibleDevice === 'string' && cudaVisibleDevice.length > 0) {
575
                cudaVisible = `CUDA_VISIBLE_DEVICES=${cudaVisibleDevice}`;
SparkSnail's avatar
SparkSnail committed
576
            } else {
577
                cudaVisible = `CUDA_VISIBLE_DEVICES=" "`;
SparkSnail's avatar
SparkSnail committed
578
            }
SparkSnail's avatar
SparkSnail committed
579
        }
580
581
        const nniManagerIp: string = this.nniManagerIpConfig ? this.nniManagerIpConfig.nniManagerIp : getIPV4Address();
        if (this.remoteRestServerPort === undefined) {
SparkSnail's avatar
SparkSnail committed
582
583
584
            const restServer: RemoteMachineJobRestServer = component.get(RemoteMachineJobRestServer);
            this.remoteRestServerPort = restServer.clusterRestServerPort;
        }
585
        const version: string = this.versionCheck ? await getVersion() : '';
586
587
        const runScriptTrialContent: string = executor.generateStartScript(
            trialJobDetail.workingDirectory,
Deshui Yu's avatar
Deshui Yu committed
588
            trialJobId,
SparkSnail's avatar
SparkSnail committed
589
            getExperimentId(),
590
            trialJobDetail.form.sequenceId.toString(),
591
            this.isMultiPhase,
592
            this.trialConfig.command,
SparkSnail's avatar
SparkSnail committed
593
594
            nniManagerIp,
            this.remoteRestServerPort,
595
            version,
596
            this.logCollection, cudaVisible);
Deshui Yu's avatar
Deshui Yu committed
597
598

        //create tmp trial working folder locally.
599
        await execMkdir(path.join(trialLocalTempFolder, '.nni'));
600
601
602

        // Write install_nni.sh, it's not used in Windows platform.
        await fs.promises.writeFile(path.join(trialLocalTempFolder, executor.getScriptName("install_nni")), CONTAINER_INSTALL_NNI_SHELL_FORMAT, { encoding: 'utf8' });
603
        // Write file content ( run.sh and parameter.cfg ) to local tmp files
604
        await fs.promises.writeFile(path.join(trialLocalTempFolder, executor.getScriptName("run")), runScriptTrialContent, { encoding: 'utf8' });
chicm-ms's avatar
chicm-ms committed
605
        await this.writeParameterFile(trialJobId, form.hyperParameters);
Deshui Yu's avatar
Deshui Yu committed
606
        // Copy files in codeDir to remote working directory
607
        await executor.copyDirectoryToRemote(trialLocalTempFolder, trialJobDetail.workingDirectory);
Deshui Yu's avatar
Deshui Yu committed
608
        // Execute command in remote machine
609
        executor.executeScript(executor.joinPath(trialJobDetail.workingDirectory, executor.getScriptName("run")), true, true);
Deshui Yu's avatar
Deshui Yu committed
610
611
    }

612
    private async updateTrialJobStatus(trialJob: RemoteMachineTrialJobDetail, executor: ShellExecutor): Promise<TrialJobDetail> {
Deshui Yu's avatar
Deshui Yu committed
613
        const deferred: Deferred<TrialJobDetail> = new Deferred<TrialJobDetail>();
614
615
        const jobpidPath: string = this.getJobPidPath(executor, trialJob.id);
        const trialReturnCodeFilePath: string = executor.joinPath(executor.getRemoteExperimentRootDir(getExperimentId()), 'trials', trialJob.id, '.nni', 'code');
616
        /* eslint-disable require-atomic-updates */
Deshui Yu's avatar
Deshui Yu committed
617
        try {
618
            const isAlive = await executor.isProcessAlive(jobpidPath);
Deshui Yu's avatar
Deshui Yu committed
619
            // if the process of jobpid is not alive any more
620
621
622
623
            if (!isAlive) {
                const trialReturnCode: string = await executor.getRemoteFileContent(trialReturnCodeFilePath);
                this.log.debug(`trailjob ${trialJob.id} return code: ${trialReturnCode}`);
                const match: RegExpMatchArray | null = trialReturnCode.trim()
624
                    .match(/^-?(\d+)\s+(\d+)$/);
625
                if (match !== null) {
Deshui Yu's avatar
Deshui Yu committed
626
627
628
629
630
                    const { 1: code, 2: timestamp } = match;
                    // Update trial job's status based on result code
                    if (parseInt(code, 10) === 0) {
                        trialJob.status = 'SUCCEEDED';
                    } else {
631
632
633
634
635
636
                        // isEarlyStopped is never set, mean it's not cancelled by NNI, so if the process's exit code >0, mark it as FAILED
                        if (trialJob.isEarlyStopped === undefined) {
                            trialJob.status = 'FAILED';
                        } else {
                            trialJob.status = getJobCancelStatus(trialJob.isEarlyStopped);
                        }
Deshui Yu's avatar
Deshui Yu committed
637
                    }
638
                    trialJob.endTime = parseInt(timestamp, 10);
639
                    this.releaseTrialResource(trialJob);
Deshui Yu's avatar
Deshui Yu committed
640
                }
chicm-ms's avatar
chicm-ms committed
641
                this.log.debug(`trailJob status update: ${trialJob.id}, ${trialJob.status}`);
Deshui Yu's avatar
Deshui Yu committed
642
643
644
            }
            deferred.resolve(trialJob);
        } catch (error) {
645
            this.log.debug(`(Ignorable mostly)Update job status exception, error is ${error.message}`);
Deshui Yu's avatar
Deshui Yu committed
646
647
648
649
650
651
652
            if (error instanceof NNIError && error.name === NNIErrorNames.NOT_FOUND) {
                deferred.resolve(trialJob);
            } else {
                trialJob.status = 'UNKNOWN';
                deferred.resolve(trialJob);
            }
        }
653
        /* eslint-enable require-atomic-updates */
Deshui Yu's avatar
Deshui Yu committed
654
655
656
        return deferred.promise;
    }

chicm-ms's avatar
chicm-ms committed
657
    public get MetricsEmitter(): EventEmitter {
SparkSnail's avatar
SparkSnail committed
658
659
660
        return this.metricsEmitter;
    }

661
    private getJobPidPath(executor: ShellExecutor, jobId: string): string {
Deshui Yu's avatar
Deshui Yu committed
662
663
664
665
666
        const trialJobDetail: RemoteMachineTrialJobDetail | undefined = this.trialJobsMap.get(jobId);
        if (trialJobDetail === undefined) {
            throw new NNIError(NNIErrorNames.INVALID_JOB_DETAIL, `Invalid job detail information for trial job ${jobId}`);
        }

667
        return executor.joinPath(trialJobDetail.workingDirectory, '.nni', 'jobpid');
Deshui Yu's avatar
Deshui Yu committed
668
    }
chicm-ms's avatar
chicm-ms committed
669

chicm-ms's avatar
chicm-ms committed
670
    private async writeParameterFile(trialJobId: string, hyperParameters: HyperParameters): Promise<void> {
671
        const executor = await this.getExecutor(trialJobId);
chicm-ms's avatar
chicm-ms committed
672

673
        const trialWorkingFolder: string = executor.joinPath(executor.getRemoteExperimentRootDir(getExperimentId()), 'trials', trialJobId);
chicm-ms's avatar
chicm-ms committed
674
675
        const trialLocalTempFolder: string = path.join(this.expRootDir, 'trials-local', trialJobId);

676
        const fileName: string = generateParamFileName(hyperParameters);
chicm-ms's avatar
chicm-ms committed
677
678
679
        const localFilepath: string = path.join(trialLocalTempFolder, fileName);
        await fs.promises.writeFile(localFilepath, hyperParameters.value, { encoding: 'utf8' });

680
        await executor.copyFileToRemote(localFilepath, executor.joinPath(trialWorkingFolder, fileName));
chicm-ms's avatar
chicm-ms committed
681
    }
Deshui Yu's avatar
Deshui Yu committed
682
683
684
}

export { RemoteMachineTrainingService };