Unverified Commit 9a3a75c8 authored by fishyds's avatar fishyds Committed by GitHub
Browse files

Fix a race condition bug that does not store Trial Job cancel status correctly (#707)

* Fix a race condition bug that does not store Trial Job cancel status correctly
parent 982b30b5
......@@ -71,6 +71,7 @@ interface TrialJobDetail {
readonly workingDirectory: string;
readonly form: JobApplicationForm;
readonly sequenceId: number;
isEarlyStopped?: boolean;
}
interface HostJobDetail {
......
......@@ -46,6 +46,7 @@ export class KubeflowJobInfoCollector extends KubernetesJobInfoCollector{
try {
kubernetesJobInfo = await kubernetesCRDClient.getKubernetesJob(kubernetesTrialJob.kubernetesJobName);
} catch(error) {
// Notice: it maynot be a 'real' error since cancel trial job can also cause getKubernetesJob failed.
this.log.error(`Get job ${kubernetesTrialJob.kubernetesJobName} info failed, error is ${error}`);
//This is not treat as a error status
return Promise.resolve();
......
......@@ -34,6 +34,7 @@ export class PAITrialJobDetail implements TrialJobDetail {
public form: JobApplicationForm;
public sequenceId: number;
public hdfsLogPath: string;
public isEarlyStopped?: boolean;
constructor(id: string, status: TrialJobStatus, paiJobName : string,
submitTime: number, workingDirectory: string, form: JobApplicationForm, sequenceId: number, hdfsLogPath: string) {
......
......@@ -103,8 +103,12 @@ export class PAIJobInfoCollector {
paiTrialJob.status = 'SUCCEEDED';
break;
case 'STOPPED':
if (paiTrialJob.status !== 'EARLY_STOPPED') {
paiTrialJob.status = 'USER_CANCELED';
if (paiTrialJob.isEarlyStopped !== undefined) {
paiTrialJob.status = paiTrialJob.isEarlyStopped === true ?
'EARLY_STOPPED' : 'USER_CANCELED';
} else {
// if paiTrialJob's isEarlyStopped is undefined, that mean we didn't stop it via cancellation, mark it as SYS_CANCELLED by PAI
paiTrialJob.status = 'SYS_CANCELED';
}
break;
case 'FAILED':
......
......@@ -324,14 +324,15 @@ class PAITrainingService implements TrainingService {
"Authorization": 'Bearer ' + this.paiToken
}
};
// Set trialjobDetail's early stopped field, to mark the job's cancellation source
trialJobDetail.isEarlyStopped = isEarlyStopped;
request(stopJobRequest, (error: Error, response: request.Response, body: any) => {
if (error || response.statusCode >= 400) {
this.log.error(`PAI Training service: stop trial ${trialJobId} to PAI Cluster failed!`);
deferred.reject(error ? error.message : 'Stop trial failed, http code: ' + response.statusCode);
} else {
if (isEarlyStopped) {
trialJobDetail.status = 'EARLY_STOPPED';
}
deferred.resolve();
}
});
......
......@@ -80,6 +80,7 @@ export class RemoteMachineTrialJobDetail implements TrialJobDetail {
public form: JobApplicationForm;
public sequenceId: number;
public rmMeta?: RemoteMachineMeta;
public isEarlyStopped?: boolean;
constructor(id: string, status: TrialJobStatus, submitTime: number,
workingDirectory: string, form: JobApplicationForm, sequenceId: number) {
......
......@@ -48,7 +48,7 @@ import {
GPU_COLLECTOR_FORMAT
} from './remoteMachineData';
import { SSHClientUtility } from './sshClientUtility';
import { validateCodeDir} from '../common/util';
import { validateCodeDir } from '../common/util';
import { RemoteMachineJobRestServer } from './remoteMachineJobRestServer';
import { CONTAINER_INSTALL_NNI_SHELL_FORMAT } from '../common/containerJobData';
import { mkDirP } from '../../common/utils';
......@@ -279,8 +279,9 @@ class RemoteMachineTrainingService implements TrainingService {
const jobpidPath: string = this.getJobPidPath(trialJob.id);
try {
// Mark the toEarlyStop tag here
trialJob.isEarlyStopped = isEarlyStopped;
await SSHClientUtility.remoteExeCommand(`pkill -P \`cat ${jobpidPath}\``, sshClient);
trialJob.status = getJobCancelStatus(isEarlyStopped);
} catch (error) {
// Not handle the error since pkill failed will not impact trial job's current status
this.log.error(`remoteTrainingService.cancelTrialJob: ${error.message}`);
......@@ -482,6 +483,11 @@ class RemoteMachineTrainingService implements TrainingService {
if (trialJobDetail === undefined) {
throw new NNIError(NNIErrorNames.INVALID_JOB_DETAIL, `Invalid job detail information for trial job ${trialJobId}`);
}
// If job is not WATIING, Don't prepare and resolve true immediately
if (trialJobDetail.status !== 'WAITING') {
deferred.resolve(true);
return deferred.promise;
}
// get an ssh client from scheduler
const rmScheduleResult: RemoteMachineScheduleResult = this.gpuScheduler.scheduleMachine(this.trialConfig.gpuNum, trialJobId);
if (rmScheduleResult.resultType === ScheduleResultType.REQUIRE_EXCEED_TOTAL) {
......@@ -640,7 +646,12 @@ class RemoteMachineTrainingService implements TrainingService {
if (parseInt(code, 10) === 0) {
trialJob.status = 'SUCCEEDED';
} else {
trialJob.status = 'FAILED';
// isEarlyStopped is never set, mean it's not cancelled by NNI, so if the process's exit code >0, mark it as FAILED
if (trialJob.isEarlyStopped === undefined) {
trialJob.status = 'FAILED';
} else {
trialJob.status = getJobCancelStatus(trialJob.isEarlyStopped);
}
}
trialJob.endTime = parseInt(timestamp, 10);
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment