remoteEnvironmentService.ts 14.8 KB
Newer Older
1
2
3
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.

4
5
6
7
import fs from 'fs';
import path from 'path';
import * as component from 'common/component';
import { getLogger, Logger } from 'common/log';
8
import { EnvironmentInformation, EnvironmentService } from '../environment';
9
import { getLogLevel } from 'common/utils';
liuzhe-lz's avatar
liuzhe-lz committed
10
import { RemoteConfig, RemoteMachineConfig } from 'common/experimentConfig';
11
12
13
import { ExperimentStartupInfo } from 'common/experimentStartupInfo';
import { execMkdir } from 'training_service/common/util';
import { ExecutorManager } from 'training_service/remote_machine/remoteMachineData';
14
15
import { ShellExecutor } from 'training_service/remote_machine/shellExecutor';
import { RemoteMachineEnvironmentInformation } from '../remote/remoteConfig';
16
import { SharedStorageService } from '../sharedStorage'
17
18
19
20
21

@component.Singleton
export class RemoteEnvironmentService extends EnvironmentService {

    private readonly initExecutorId = "initConnection";
22
    private readonly machineExecutorManagerMap: Map<RemoteMachineConfig, ExecutorManager>;
23
    private readonly environmentExecutorManagerMap: Map<string, ExecutorManager>;
24
    private readonly remoteMachineMetaOccupiedMap: Map<RemoteMachineConfig, boolean>;
25
    private readonly log: Logger;
liuzhe-lz's avatar
liuzhe-lz committed
26
    private sshConnectionPromises: Promise<void[]>;
27
    private experimentRootDir: string;
SparkSnail's avatar
SparkSnail committed
28
    private remoteExperimentRootDir: string = "";
29
    private experimentId: string;
liuzhe-lz's avatar
liuzhe-lz committed
30
    private config: RemoteConfig;
31

liuzhe-lz's avatar
liuzhe-lz committed
32
    constructor(config: RemoteConfig, info: ExperimentStartupInfo) {
33
        super();
34
        this.experimentId = info.experimentId;
35
        this.environmentExecutorManagerMap = new Map<string, ExecutorManager>();
36
37
        this.machineExecutorManagerMap = new Map<RemoteMachineConfig, ExecutorManager>();
        this.remoteMachineMetaOccupiedMap = new Map<RemoteMachineConfig, boolean>();
38
        this.experimentRootDir = info.logDir;
liuzhe-lz's avatar
liuzhe-lz committed
39
        this.log = getLogger('RemoteEnvironmentService');
liuzhe-lz's avatar
liuzhe-lz committed
40
        this.config = config;
41
42
43
44
45
46

        // codeDir is not a valid directory, throw Error
        if (!fs.lstatSync(this.config.trialCodeDirectory).isDirectory()) {
            throw new Error(`codeDir ${this.config.trialCodeDirectory} is not a directory`);
        }

liuzhe-lz's avatar
liuzhe-lz committed
47
        this.sshConnectionPromises = Promise.all(this.config.machineList.map(
48
            machine => this.initRemoteMachineOnConnected(machine)
liuzhe-lz's avatar
liuzhe-lz committed
49
50
51
52
53
54
55
56
57
58
        ));
    }

    public async init(): Promise<void> {
        await this.sshConnectionPromises;
        this.log.info('ssh connection initialized!');
        Array.from(this.machineExecutorManagerMap.keys()).forEach(rmMeta => {
            // initialize remoteMachineMetaOccupiedMap, false means not occupied
            this.remoteMachineMetaOccupiedMap.set(rmMeta, false);
        });
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
    }

    public get prefetchedEnvironmentCount(): number {
        return this.machineExecutorManagerMap.size;
    }

    public get environmentMaintenceLoopInterval(): number {
        return 5000;
    }

    public get hasMoreEnvironments(): boolean {
        return false;
    }

    public get hasStorageService(): boolean {
        return false;
    }

77
78
79
80
    public get getName(): string {
        return 'remote';
    }

81
    private scheduleMachine(): RemoteMachineConfig | undefined {
82
83
84
85
86
87
88
89
90
        for (const [rmMeta, occupied] of this.remoteMachineMetaOccupiedMap) {
            if (!occupied) {
                this.remoteMachineMetaOccupiedMap.set(rmMeta, true);
                return rmMeta;
            }
        }
        return undefined;
    }

91
    private async initRemoteMachineOnConnected(rmMeta: RemoteMachineConfig): Promise<void> {
92
        const executorManager: ExecutorManager = new ExecutorManager(rmMeta);
93
        this.log.info(`connecting to ${rmMeta.user}@${rmMeta.host}:${rmMeta.port}`);
94
95
96
97
98
99
        const executor: ShellExecutor = await executorManager.getExecutor(this.initExecutorId);
        this.log.debug(`reached ${executor.name}`);
        this.machineExecutorManagerMap.set(rmMeta, executorManager);
        this.log.debug(`initializing ${executor.name}`);

        // Create root working directory after executor is ready
100
        const nniRootDir: string = executor.joinPath(executor.getTempPath(), 'nni-experiments');
101
        await executor.createFolder(executor.getRemoteExperimentRootDir(this.experimentId));
102
103

        // the directory to store temp scripts in remote machine
104
        const remoteGpuScriptCollectorDir: string = executor.getRemoteScriptsPath(this.experimentId);
105
106
107
108
109

        // clean up previous result.
        await executor.createFolder(remoteGpuScriptCollectorDir, true);
        await executor.allowPermission(true, nniRootDir);
    }
110
111

    public async refreshEnvironmentsStatus(environments: EnvironmentInformation[]): Promise<void> {	
112
        const tasks = environments.map(environment => this.refreshEnvironment(environment));
113
114
115
        await Promise.all(tasks);	
    }

116
117
    private async refreshEnvironment(environment: EnvironmentInformation): Promise<void> {
        const executor = await this.getExecutor(environment.id);
SparkSnail's avatar
SparkSnail committed
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
        const jobpidPath: string = `${environment.runnerWorkingFolder}/pid`;
        const runnerReturnCodeFilePath: string = `${environment.runnerWorkingFolder}/code`;
        /* eslint-disable require-atomic-updates */
        try {
            // check if pid file exist
            const pidExist = await executor.fileExist(jobpidPath);
            if (!pidExist) {
                return;
            }
            const isAlive = await executor.isProcessAlive(jobpidPath);
            environment.status = 'RUNNING';
            // if the process of jobpid is not alive any more
            if (!isAlive) {
                const remoteEnvironment: RemoteMachineEnvironmentInformation = environment as RemoteMachineEnvironmentInformation;
                if (remoteEnvironment.rmMachineMeta === undefined) {
                    throw new Error(`${remoteEnvironment.id} machine meta not initialized!`);
                }
135
                this.log.info(`pid in ${remoteEnvironment.rmMachineMeta.host}:${jobpidPath} is not alive!`);
SparkSnail's avatar
SparkSnail committed
136
137
138
139
140
141
142
143
144
145
146
                if (fs.existsSync(runnerReturnCodeFilePath)) {
                    const runnerReturnCode: string = await executor.getRemoteFileContent(runnerReturnCodeFilePath);
                    const match: RegExpMatchArray | null = runnerReturnCode.trim()
                        .match(/^-?(\d+)\s+(\d+)$/);
                    if (match !== null) {
                        const { 1: code } = match;
                        // Update trial job's status based on result code
                        if (parseInt(code, 10) === 0) {
                            environment.setStatus('SUCCEEDED');
                        } else {
                            environment.setStatus('FAILED');
147
                        }
148
                        await this.releaseEnvironmentResource(environment);
149
150
151
                    }
                }
            }
SparkSnail's avatar
SparkSnail committed
152
153
154
        } catch (error) {
            this.log.error(`Update job status exception, error is ${error.message}`);
        }
155
156
157
158
159
160
    }

    /**
     * If a environment is finished, release the connection resource
     * @param environment remote machine environment job detail
     */
161
162
163
164
165
166
167
168
169
170
    private async releaseEnvironmentResource(environment: EnvironmentInformation): Promise<void> {
        if (environment.useSharedStorage) {
            const executor = await this.getExecutor(environment.id);
            const remoteUmountCommand = component.get<SharedStorageService>(SharedStorageService).remoteUmountCommand;
            const result = await executor.executeScript(remoteUmountCommand, false, false);
            if (result.exitCode !== 0) {
                this.log.error(`Umount shared storage on remote machine failed.\n ERROR: ${result.stderr}`);
            }
        }

171
172
173
174
175
176
177
178
179
180
181
182
183
184
        const executorManager = this.environmentExecutorManagerMap.get(environment.id);
        if (executorManager === undefined) {
            throw new Error(`ExecutorManager is not assigned for environment ${environment.id}`);
        }

        // Note, it still keep reference in trialExecutorManagerMap, as there may be following requests from nni manager.
        executorManager.releaseExecutor(environment.id);
        const remoteEnvironment: RemoteMachineEnvironmentInformation = environment as RemoteMachineEnvironmentInformation;
        if (remoteEnvironment.rmMachineMeta === undefined) {
            throw new Error(`${remoteEnvironment.id} rmMachineMeta not initialized!`);
        }
        this.remoteMachineMetaOccupiedMap.set(remoteEnvironment.rmMachineMeta, false);
    }

Ni Hao's avatar
Ni Hao committed
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
    private async getScript(environment: EnvironmentInformation): Promise<string> {
        const executor = await this.getExecutor(environment.id);
        const isDebug = getLogLevel() == "debug";
        let script: string = environment.command;
        environment.runnerWorkingFolder = executor.joinPath(this.remoteExperimentRootDir, 'envs', environment.id);

        let codeScript = `echo $? \`date +%s%3N\` >${environment.runnerWorkingFolder}/code`;
        if (executor.isWindows) {
            const prepare = `mkdir envs\\${environment.id} 2>NUL & cd envs\\${environment.id}`;
            const startrun = `powershell ..\\install_nni.ps1 && python -m nni.tools.trial_tool.trial_runner`;
            const developingScript = "IF EXIST nni_trial_tool (ECHO \"nni_trial_tool exists already\") ELSE (mkdir nni_trial_tool && tar -xof ../nni_trial_tool.tar.gz -C ./nni_trial_tool) && pip3 install websockets";

            script = isDebug ? `${prepare} && ${developingScript} && ${startrun}` : `${prepare} && ${startrun}`;
            codeScript = `powershell -command "Write $? " " (((New-TimeSpan -Start (Get-Date "01/01/1970") -End (Get-Date).ToUniversalTime()).TotalMilliseconds).ToString("0")) | Out-file ${path.join(environment.runnerWorkingFolder, 'code')} -Append -NoNewline -encoding utf8"`;
        }

        script = `cd ${this.remoteExperimentRootDir} && \
            ${script} --job_pid_file ${environment.runnerWorkingFolder}/pid \
            1>${environment.runnerWorkingFolder}/trialrunner_stdout 2>${environment.runnerWorkingFolder}/trialrunner_stderr \
            && ${codeScript}`;

        return script;
    }

209
210
211
212
213
214
215
216
217
218
219
    public async startEnvironment(environment: EnvironmentInformation): Promise<void> {
        const remoteEnvironment: RemoteMachineEnvironmentInformation = environment as RemoteMachineEnvironmentInformation;
        remoteEnvironment.status = 'WAITING';
        // schedule machine for environment, generate command
        await this.prepareEnvironment(remoteEnvironment);
        // launch runner process in machine
        await this.launchRunner(environment);
    }

    private async prepareEnvironment(environment: RemoteMachineEnvironmentInformation): Promise<boolean> {
        // get an executor from scheduler
220
        const rmMachineMeta: RemoteMachineConfig | undefined = this.scheduleMachine();
221
222
223
224
225
226
227
228
229
230
231
        if (rmMachineMeta === undefined) {
            this.log.warning(`No available machine!`);
            return Promise.resolve(false);
        } else {
            environment.rmMachineMeta = rmMachineMeta;
            const executorManager: ExecutorManager | undefined = this.machineExecutorManagerMap.get(environment.rmMachineMeta);
            if (executorManager === undefined) {
                throw new Error(`executorManager not initialized`);
            }
            this.environmentExecutorManagerMap.set(environment.id, executorManager);
            const executor = await this.getExecutor(environment.id);
232
            if (environment.useSharedStorage) {
SparkSnail's avatar
SparkSnail committed
233
                this.remoteExperimentRootDir = component.get<SharedStorageService>(SharedStorageService).remoteWorkingRoot;
J-shang's avatar
J-shang committed
234
235
236
237
                if (!this.remoteExperimentRootDir.startsWith('/')) {
                    this.remoteExperimentRootDir = executor.joinPath((await executor.getCurrentPath()).trim(), this.remoteExperimentRootDir);
                }
                const remoteMountCommand = component.get<SharedStorageService>(SharedStorageService).remoteMountCommand.replace(/echo -e /g, `echo `).replace(/echo /g, `echo -e `).replace(/\\\$/g, `\\\\\\$`);
238
239
240
241
                const result = await executor.executeScript(remoteMountCommand, false, false);
                if (result.exitCode !== 0) {
                    throw new Error(`Mount shared storage on remote machine failed.\n ERROR: ${result.stderr}`);
                }
242
            } else {
243
                this.remoteExperimentRootDir = executor.getRemoteExperimentRootDir(this.experimentId);
244
            }
Ni Hao's avatar
Ni Hao committed
245
246

            environment.command = await this.getScript(environment);
247
            environment.useActiveGpu = rmMachineMeta.useActiveGpu;
248
249
250
251
252
253
254
            return Promise.resolve(true);
        }
    }

    private async launchRunner(environment: RemoteMachineEnvironmentInformation): Promise<void> {
        const executor = await this.getExecutor(environment.id);
        const environmentLocalTempFolder: string =  
SparkSnail's avatar
SparkSnail committed
255
            path.join(this.experimentRootDir, "environment-temp")
256
257
258
259
260
        await executor.createFolder(environment.runnerWorkingFolder);
        await execMkdir(environmentLocalTempFolder);
        await fs.promises.writeFile(path.join(environmentLocalTempFolder, executor.getScriptName("run")),
        environment.command, { encoding: 'utf8' });
        // Copy files in codeDir to remote working directory
SparkSnail's avatar
SparkSnail committed
261
        await executor.copyDirectoryToRemote(environmentLocalTempFolder, this.remoteExperimentRootDir);
262
        // Execute command in remote machine, set isInteractive=true to run script in conda environment
Ni Hao's avatar
Ni Hao committed
263
        executor.executeScript(executor.joinPath(this.remoteExperimentRootDir,
264
            executor.getScriptName("run")), true, true);
265
266
267
        if (environment.rmMachineMeta === undefined) {
            throw new Error(`${environment.id} rmMachineMeta not initialized!`);
        }
268
        environment.trackingUrl = `file://${environment.rmMachineMeta.host}:${environment.runnerWorkingFolder}`;
269
270
271
272
273
274
275
276
277
278
279
    }

    private async getExecutor(environmentId: string): Promise<ShellExecutor> {
        const executorManager = this.environmentExecutorManagerMap.get(environmentId);
        if (executorManager === undefined) {
            throw new Error(`ExecutorManager is not assigned for environment ${environmentId}`);
        }
        return await executorManager.getExecutor(environmentId);
    }

    public async stopEnvironment(environment: EnvironmentInformation): Promise<void> {
280
        if (environment.isAlive === false) {
281
            return;
282
283
        }

284
285
286
287
        const executor = await this.getExecutor(environment.id);

        if (environment.status === 'UNKNOWN') {
            environment.status = 'USER_CANCELED';
288
            await this.releaseEnvironmentResource(environment);
289
            return;
290
291
292
293
294
        }

        const jobpidPath: string = `${environment.runnerWorkingFolder}/pid`;
        try {
            await executor.killChildProcesses(jobpidPath);
295
            await this.releaseEnvironmentResource(environment);
296
297
298
299
300
        } catch (error) {
            this.log.error(`stopEnvironment: ${error}`);
        }
    }
}