Unverified Commit add7ca66 authored by SparkSnail's avatar SparkSnail Committed by GitHub
Browse files

Fix remote reuse bugs (#2981)

parent 058b58a6
...@@ -136,6 +136,10 @@ class LinuxCommands extends OsCommands { ...@@ -136,6 +136,10 @@ class LinuxCommands extends OsCommands {
return `${preCommand} && ${command}`; return `${preCommand} && ${command}`;
} }
} }
public fileExistCommand(filePath: string): string {
return `test -e ${filePath} && echo True || echo False`;
}
} }
export { LinuxCommands }; export { LinuxCommands };
...@@ -130,6 +130,10 @@ class WindowsCommands extends OsCommands { ...@@ -130,6 +130,10 @@ class WindowsCommands extends OsCommands {
return `${preCommand} && set prePath=%path% && ${command}`; return `${preCommand} && set prePath=%path% && ${command}`;
} }
} }
public fileExistCommand(filePath: string): string {
return `powershell Test-Path ${filePath} -PathType Leaf`;
}
} }
export { WindowsCommands }; export { WindowsCommands };
...@@ -29,6 +29,7 @@ abstract class OsCommands { ...@@ -29,6 +29,7 @@ abstract class OsCommands {
public abstract extractFile(tarFileName: string, targetFolder: string): string; public abstract extractFile(tarFileName: string, targetFolder: string): string;
public abstract executeScript(script: string, isFile: boolean): string; public abstract executeScript(script: string, isFile: boolean): string;
public abstract addPreCommand(preCommand: string | undefined, command: string | undefined): string | undefined; public abstract addPreCommand(preCommand: string | undefined, command: string | undefined): string | undefined;
public abstract fileExistCommand(filePath: string): string | undefined;
public joinPath(...paths: string[]): string { public joinPath(...paths: string[]): string {
let dir: string = paths.filter((path: any) => path !== '').join(this.pathSpliter); let dir: string = paths.filter((path: any) => path !== '').join(this.pathSpliter);
......
...@@ -238,6 +238,12 @@ class ShellExecutor { ...@@ -238,6 +238,12 @@ class ShellExecutor {
return commandResult.exitCode == 0; return commandResult.exitCode == 0;
} }
public async fileExist(filePath: string): Promise<boolean> {
const commandText = this.osCommands && this.osCommands.fileExistCommand(filePath);
const commandResult = await this.execute(commandText);
return commandResult.stdout !== undefined && commandResult.stdout.trim() === 'True';
}
public async extractFile(tarFileName: string, targetFolder: string): Promise<boolean> { public async extractFile(tarFileName: string, targetFolder: string): Promise<boolean> {
const commandText = this.osCommands && this.osCommands.extractFile(tarFileName, targetFolder); const commandText = this.osCommands && this.osCommands.extractFile(tarFileName, targetFolder);
const commandResult = await this.execute(commandText); const commandResult = await this.execute(commandText);
......
...@@ -137,40 +137,43 @@ export class RemoteEnvironmentService extends EnvironmentService { ...@@ -137,40 +137,43 @@ export class RemoteEnvironmentService extends EnvironmentService {
private async refreshEnvironment(environment: EnvironmentInformation): Promise<void> { private async refreshEnvironment(environment: EnvironmentInformation): Promise<void> {
const executor = await this.getExecutor(environment.id); const executor = await this.getExecutor(environment.id);
const jobpidPath: string = `${environment.runnerWorkingFolder}/pid`; const jobpidPath: string = `${environment.runnerWorkingFolder}/pid`;
const runnerReturnCodeFilePath: string = `${environment.runnerWorkingFolder}/code`; const runnerReturnCodeFilePath: string = `${environment.runnerWorkingFolder}/code`;
if (fs.existsSync(jobpidPath)) { /* eslint-disable require-atomic-updates */
/* eslint-disable require-atomic-updates */ try {
try { // check if pid file exist
const isAlive = await executor.isProcessAlive(jobpidPath); const pidExist = await executor.fileExist(jobpidPath);
// if the process of jobpid is not alive any more if (!pidExist) {
if (!isAlive) { return;
const remoteEnvironment: RemoteMachineEnvironmentInformation = environment as RemoteMachineEnvironmentInformation; }
if (remoteEnvironment.rmMachineMeta === undefined) { const isAlive = await executor.isProcessAlive(jobpidPath);
throw new Error(`${remoteEnvironment.id} machine meta not initialized!`); environment.status = 'RUNNING';
} // if the process of jobpid is not alive any more
this.log.info(`pid in ${remoteEnvironment.rmMachineMeta.ip}:${jobpidPath} is not alive!`); if (!isAlive) {
if (fs.existsSync(runnerReturnCodeFilePath)) { const remoteEnvironment: RemoteMachineEnvironmentInformation = environment as RemoteMachineEnvironmentInformation;
const runnerReturnCode: string = await executor.getRemoteFileContent(runnerReturnCodeFilePath); if (remoteEnvironment.rmMachineMeta === undefined) {
const match: RegExpMatchArray | null = runnerReturnCode.trim() throw new Error(`${remoteEnvironment.id} machine meta not initialized!`);
.match(/^-?(\d+)\s+(\d+)$/); }
if (match !== null) { this.log.info(`pid in ${remoteEnvironment.rmMachineMeta.ip}:${jobpidPath} is not alive!`);
const { 1: code } = match; if (fs.existsSync(runnerReturnCodeFilePath)) {
// Update trial job's status based on result code const runnerReturnCode: string = await executor.getRemoteFileContent(runnerReturnCodeFilePath);
if (parseInt(code, 10) === 0) { const match: RegExpMatchArray | null = runnerReturnCode.trim()
environment.setStatus('SUCCEEDED'); .match(/^-?(\d+)\s+(\d+)$/);
} else { if (match !== null) {
environment.setStatus('FAILED'); const { 1: code } = match;
} // Update trial job's status based on result code
this.releaseEnvironmentResource(environment); if (parseInt(code, 10) === 0) {
} environment.setStatus('SUCCEEDED');
} else {
environment.setStatus('FAILED');
} }
this.releaseEnvironmentResource(environment);
} }
} catch (error) {
this.releaseEnvironmentResource(environment);
this.log.error(`Update job status exception, error is ${error.message}`);
} }
} }
} catch (error) {
this.log.error(`Update job status exception, error is ${error.message}`);
}
} }
public async refreshEnvironmentsStatus(environments: EnvironmentInformation[]): Promise<void> { public async refreshEnvironmentsStatus(environments: EnvironmentInformation[]): Promise<void> {
...@@ -245,6 +248,7 @@ export class RemoteEnvironmentService extends EnvironmentService { ...@@ -245,6 +248,7 @@ export class RemoteEnvironmentService extends EnvironmentService {
'envs', environment.id) 'envs', environment.id)
environment.command = `cd ${environment.runnerWorkingFolder} && \ environment.command = `cd ${environment.runnerWorkingFolder} && \
${environment.command} --job_pid_file ${environment.runnerWorkingFolder}/pid \ ${environment.command} --job_pid_file ${environment.runnerWorkingFolder}/pid \
1>${environment.runnerWorkingFolder}/trialrunner_stdout 2>${environment.runnerWorkingFolder}/trialrunner_stderr \
&& echo $? \`date +%s%3N\` >${environment.runnerWorkingFolder}/code`; && echo $? \`date +%s%3N\` >${environment.runnerWorkingFolder}/code`;
return Promise.resolve(true); return Promise.resolve(true);
} }
...@@ -266,7 +270,6 @@ ${environment.command} --job_pid_file ${environment.runnerWorkingFolder}/pid \ ...@@ -266,7 +270,6 @@ ${environment.command} --job_pid_file ${environment.runnerWorkingFolder}/pid \
// Execute command in remote machine // Execute command in remote machine
executor.executeScript(executor.joinPath(environment.runnerWorkingFolder, executor.executeScript(executor.joinPath(environment.runnerWorkingFolder,
executor.getScriptName("run")), true, false); executor.getScriptName("run")), true, false);
environment.status = 'RUNNING';
if (environment.rmMachineMeta === undefined) { if (environment.rmMachineMeta === undefined) {
throw new Error(`${environment.id} rmMachineMeta not initialized!`); throw new Error(`${environment.id} rmMachineMeta not initialized!`);
} }
......
...@@ -662,19 +662,22 @@ class TrialDispatcher implements TrainingService { ...@@ -662,19 +662,22 @@ class TrialDispatcher implements TrainingService {
trial.status = "RUNNING"; trial.status = "RUNNING";
await this.commandChannel.sendCommand(trial.environment, NEW_TRIAL_JOB, trial.settings); await this.commandChannel.sendCommand(trial.environment, NEW_TRIAL_JOB, trial.settings);
} }
/**
* release the trial assigned environment resources
* @param trial
*/
private releaseEnvironment(trial: TrialDetail): void { private releaseEnvironment(trial: TrialDetail): void {
if (undefined === trial.environment) { if (trial.environment !== undefined) {
throw new Error(`TrialDispatcher: environment is not assigned to trial ${trial.id}, and cannot be released!`); if (trial.environment.runningTrialCount <= 0) {
} throw new Error(`TrialDispatcher: environment ${trial.environment.id} has no counted running trial!`);
if (trial.environment.runningTrialCount <= 0) { }
throw new Error(`TrialDispatcher: environment ${trial.environment.id} has no counted running trial!`); trial.environment.runningTrialCount--;
trial.environment = undefined;
} }
if (true === this.enableGpuScheduler) { if (true === this.enableGpuScheduler) {
this.gpuScheduler.removeGpuReservation(trial); this.gpuScheduler.removeGpuReservation(trial);
} }
trial.environment.runningTrialCount--;
trial.environment = undefined;
} }
private async handleMetricData(trialId: string, data: any): Promise<void> { private async handleMetricData(trialId: string, data: any): Promise<void> {
......
...@@ -95,6 +95,8 @@ pai: ...@@ -95,6 +95,8 @@ pai:
containerNFSMountPath: containerNFSMountPath:
paiStorageConfigName: paiStorageConfigName:
remote: remote:
remoteConfig:
reuse: false
machineList: machineList:
- ip: - ip:
passwd: passwd:
......
...@@ -86,6 +86,8 @@ def update_training_service_config(args): ...@@ -86,6 +86,8 @@ def update_training_service_config(args):
config[args.ts]['machineList'][0]['port'] = args.remote_port config[args.ts]['machineList'][0]['port'] = args.remote_port
if args.remote_pwd is not None: if args.remote_pwd is not None:
config[args.ts]['machineList'][0]['passwd'] = args.remote_pwd config[args.ts]['machineList'][0]['passwd'] = args.remote_pwd
if args.remote_reuse is not None:
config[args.ts]['remoteConfig']['reuse'] = args.remote_reuse.lower() == 'true'
dump_yml_content(TRAINING_SERVICE_FILE, config) dump_yml_content(TRAINING_SERVICE_FILE, config)
...@@ -119,6 +121,7 @@ if __name__ == '__main__': ...@@ -119,6 +121,7 @@ if __name__ == '__main__':
parser.add_argument("--remote_pwd", type=str) parser.add_argument("--remote_pwd", type=str)
parser.add_argument("--remote_host", type=str) parser.add_argument("--remote_host", type=str)
parser.add_argument("--remote_port", type=int) parser.add_argument("--remote_port", type=int)
parser.add_argument("--remote_reuse", type=str)
args = parser.parse_args() args = parser.parse_args()
update_training_service_config(args) update_training_service_config(args)
...@@ -62,7 +62,7 @@ jobs: ...@@ -62,7 +62,7 @@ jobs:
- script: | - script: |
set -e set -e
cd test cd test
python3 nni_test/nnitest/generate_ts_config.py --ts remote --remote_user $(docker_user) --remote_host $(remote_host) \ python3 nni_test/nnitest/generate_ts_config.py --ts remote --remote_reuse $(remote_reuse) --remote_user $(docker_user) --remote_host $(remote_host) \
--remote_port $(cat port) --remote_pwd $(docker_pwd) --nni_manager_ip $(nni_manager_ip) --remote_port $(cat port) --remote_pwd $(docker_pwd) --nni_manager_ip $(nni_manager_ip)
cat config/training_service.yml cat config/training_service.yml
PATH=$HOME/.local/bin:$PATH python3 nni_test/nnitest/run_tests.py --config config/integration_tests.yml --ts remote PATH=$HOME/.local/bin:$PATH python3 nni_test/nnitest/run_tests.py --config config/integration_tests.yml --ts remote
......
...@@ -137,10 +137,15 @@ class Trial: ...@@ -137,10 +137,15 @@ class Trial:
def kill(self, trial_id=None): def kill(self, trial_id=None):
if trial_id == self.id or trial_id is None: if trial_id == self.id or trial_id is None:
if self.process is not None: if self.process is not None:
nni_log(LogType.Info, "%s: killing trial" % self.name) try:
for child in psutil.Process(self.process.pid).children(True): nni_log(LogType.Info, "%s: killing trial" % self.name)
child.kill() for child in psutil.Process(self.process.pid).children(True):
self.process.kill() child.kill()
self.process.kill()
except psutil.NoSuchProcess:
nni_log(LogType.Info, "kill trial %s failed: %s does not exist!" % (trial_id, self.process.pid))
except Exception as ex:
nni_log(LogType.Error, "kill trial %s failed: %s " % (trial_id, str(ex)))
self.cleanup() self.cleanup()
def cleanup(self): def cleanup(self):
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment