Unverified Commit c4d1aefe authored by QuanluZhang's avatar QuanluZhang Committed by GitHub
Browse files

Fix trialjobstate (#385)

* add one more trial job status, EARLY_STOPPED

* fix datastore/nnimanager/mockeddatastore. test/webui/metrics_reader not done. USER_TO_CANCEL

* fix bug

* modifications based on Deshui's comments

* fix bug

* fix bug in remote mode
parent c2a4ce6c
...@@ -22,7 +22,7 @@ ...@@ -22,7 +22,7 @@
/** /**
* define TrialJobStatus * define TrialJobStatus
*/ */
type TrialJobStatus = 'UNKNOWN' | 'WAITING' | 'RUNNING' | 'SUCCEEDED' | 'FAILED' | 'USER_CANCELED' | 'SYS_CANCELED'; type TrialJobStatus = 'UNKNOWN' | 'WAITING' | 'RUNNING' | 'SUCCEEDED' | 'FAILED' | 'USER_CANCELED' | 'SYS_CANCELED' | 'EARLY_STOPPED';
type JobType = 'TRIAL' | 'HOST'; type JobType = 'TRIAL' | 'HOST';
interface TrainingServiceMetadata { interface TrainingServiceMetadata {
...@@ -113,7 +113,7 @@ abstract class TrainingService { ...@@ -113,7 +113,7 @@ abstract class TrainingService {
public abstract submitTrialJob(form: JobApplicationForm): Promise<TrialJobDetail>; public abstract submitTrialJob(form: JobApplicationForm): Promise<TrialJobDetail>;
public abstract updateTrialJob(trialJobId: string, form: JobApplicationForm): Promise<TrialJobDetail>; public abstract updateTrialJob(trialJobId: string, form: JobApplicationForm): Promise<TrialJobDetail>;
public abstract get isMultiPhaseJobSupported(): boolean; public abstract get isMultiPhaseJobSupported(): boolean;
public abstract cancelTrialJob(trialJobId: string): Promise<void>; public abstract cancelTrialJob(trialJobId: string, isEarlyStopped?: boolean): Promise<void>;
public abstract setClusterMetadata(key: string, value: string): Promise<void>; public abstract setClusterMetadata(key: string, value: string): Promise<void>;
public abstract getClusterMetadata(key: string): Promise<string>; public abstract getClusterMetadata(key: string): Promise<string>;
public abstract cleanUp(): Promise<void>; public abstract cleanUp(): Promise<void>;
......
...@@ -31,7 +31,7 @@ import * as util from 'util'; ...@@ -31,7 +31,7 @@ import * as util from 'util';
import { Database, DataStore } from './datastore'; import { Database, DataStore } from './datastore';
import { ExperimentStartupInfo, getExperimentId, setExperimentStartupInfo } from './experimentStartupInfo'; import { ExperimentStartupInfo, getExperimentId, setExperimentStartupInfo } from './experimentStartupInfo';
import { Manager } from './manager'; import { Manager } from './manager';
import { HyperParameters, TrainingService } from './trainingService'; import { HyperParameters, TrainingService, TrialJobStatus } from './trainingService';
function getExperimentRootDir(): string { function getExperimentRootDir(): string {
return path.join(os.homedir(), 'nni', 'experiments', getExperimentId()); return path.join(os.homedir(), 'nni', 'experiments', getExperimentId());
...@@ -272,5 +272,12 @@ function getIPV4Address(): string { ...@@ -272,5 +272,12 @@ function getIPV4Address(): string {
throw Error('getIPV4Address() failed because no valid IPv4 address found.') throw Error('getIPV4Address() failed because no valid IPv4 address found.')
} }
export { generateParamFileName, getMsgDispatcherCommand, getLogDir, getExperimentRootDir, /**
* Get the status of canceled jobs according to the hint isEarlyStopped
*/
function getJobCancelStatus(isEarlyStopped: boolean): TrialJobStatus {
return isEarlyStopped ? 'EARLY_STOPPED' : 'USER_CANCELED';
}
export { generateParamFileName, getMsgDispatcherCommand, getLogDir, getExperimentRootDir, getJobCancelStatus,
getDefaultDatabaseDir, getIPV4Address, mkDirP, delay, prepareUnitTest, parseArg, cleanupUnitTest, uniqueString, randomSelect }; getDefaultDatabaseDir, getIPV4Address, mkDirP, delay, prepareUnitTest, parseArg, cleanupUnitTest, uniqueString, randomSelect };
...@@ -271,6 +271,7 @@ class NNIDataStore implements DataStore { ...@@ -271,6 +271,7 @@ class NNIDataStore implements DataStore {
case 'FAILED': case 'FAILED':
case 'USER_CANCELED': case 'USER_CANCELED':
case 'SYS_CANCELED': case 'SYS_CANCELED':
case 'EARLY_STOPPED':
if (record.logPath !== undefined) { if (record.logPath !== undefined) {
jobInfo.logPath = record.logPath; jobInfo.logPath = record.logPath;
} }
......
...@@ -372,6 +372,7 @@ class NNIManager implements Manager { ...@@ -372,6 +372,7 @@ class NNIManager implements Manager {
switch (trialJobDetail.status) { switch (trialJobDetail.status) {
case 'SUCCEEDED': case 'SUCCEEDED':
case 'USER_CANCELED': case 'USER_CANCELED':
case 'EARLY_STOPPED':
this.trialJobs.delete(trialJobId); this.trialJobs.delete(trialJobId);
finishedTrialJobNum++; finishedTrialJobNum++;
this.dispatcher.sendCommand(TRIAL_END, JSON.stringify({trial_job_id: trialJobDetail.id, event: trialJobDetail.status})); this.dispatcher.sendCommand(TRIAL_END, JSON.stringify({trial_job_id: trialJobDetail.id, event: trialJobDetail.status}));
...@@ -594,7 +595,7 @@ class NNIManager implements Manager { ...@@ -594,7 +595,7 @@ class NNIManager implements Manager {
// ignore this event for now // ignore this event for now
break; break;
case KILL_TRIAL_JOB: case KILL_TRIAL_JOB:
await this.trainingService.cancelTrialJob(JSON.parse(content)); await this.trainingService.cancelTrialJob(JSON.parse(content), true);
break; break;
default: default:
throw new Error('Error: unsupported command type from tuner'); throw new Error('Error: unsupported command type from tuner');
......
...@@ -240,6 +240,7 @@ class MockedDataStore implements DataStore { ...@@ -240,6 +240,7 @@ class MockedDataStore implements DataStore {
case 'FAILED': case 'FAILED':
case 'USER_CANCELED': case 'USER_CANCELED':
case 'SYS_CANCELED': case 'SYS_CANCELED':
case 'EARLY_STOPPED':
jobInfo.endTime = Date.now(); jobInfo.endTime = Date.now();
} }
jobInfo.status = this.getJobStatusByLatestEvent(record.event); jobInfo.status = this.getJobStatusByLatestEvent(record.event);
...@@ -249,4 +250,4 @@ class MockedDataStore implements DataStore { ...@@ -249,4 +250,4 @@ class MockedDataStore implements DataStore {
} }
} }
export { MockedDataStore }; export { MockedDataStore };
\ No newline at end of file
...@@ -102,7 +102,7 @@ class MockedTrainingService extends TrainingService { ...@@ -102,7 +102,7 @@ class MockedTrainingService extends TrainingService {
return false; return false;
} }
public cancelTrialJob(trialJobId: string): Promise<void> { public cancelTrialJob(trialJobId: string, isEarlyStopped: boolean = false): Promise<void> {
const deferred = new Deferred<void>(); const deferred = new Deferred<void>();
if(trialJobId === '1234' || trialJobId === '3456'){ if(trialJobId === '1234' || trialJobId === '3456'){
deferred.resolve(); deferred.resolve();
...@@ -138,4 +138,4 @@ class MockedTrainingService extends TrainingService { ...@@ -138,4 +138,4 @@ class MockedTrainingService extends TrainingService {
} }
} }
export{MockedTrainingService, testTrainingServiceProvider} export{MockedTrainingService, testTrainingServiceProvider}
\ No newline at end of file
...@@ -147,7 +147,7 @@ class TrialMetricsReader(): ...@@ -147,7 +147,7 @@ class TrialMetricsReader():
status = '' status = ''
if return_code == 0: if return_code == 0:
status = 'SUCCEEDED' status = 'SUCCEEDED'
elif return_code == 141: elif return_code > 128:
status = 'USER_CANCELED' status = 'USER_CANCELED'
else: else:
status = 'FAILED' status = 'FAILED'
......
...@@ -35,7 +35,7 @@ import { ...@@ -35,7 +35,7 @@ import {
JobApplicationForm, TrainingService, TrialJobApplicationForm, JobApplicationForm, TrainingService, TrialJobApplicationForm,
TrialJobDetail, TrialJobMetric, NNIManagerIpConfig TrialJobDetail, TrialJobMetric, NNIManagerIpConfig
} from '../../common/trainingService'; } from '../../common/trainingService';
import { delay, generateParamFileName, getExperimentRootDir, getIPV4Address, uniqueString } from '../../common/utils'; import { delay, generateParamFileName, getExperimentRootDir, getIPV4Address, uniqueString, getJobCancelStatus } from '../../common/utils';
import { KubeflowClusterConfig, kubeflowOperatorMap, KubeflowTrialConfig, NFSConfig } from './kubeflowConfig'; import { KubeflowClusterConfig, kubeflowOperatorMap, KubeflowTrialConfig, NFSConfig } from './kubeflowConfig';
import { KubeflowTrialJobDetail } from './kubeflowData'; import { KubeflowTrialJobDetail } from './kubeflowData';
import { KubeflowJobRestServer } from './kubeflowJobRestServer'; import { KubeflowJobRestServer } from './kubeflowJobRestServer';
...@@ -244,7 +244,7 @@ class KubeflowTrainingService implements TrainingService { ...@@ -244,7 +244,7 @@ class KubeflowTrainingService implements TrainingService {
return false; return false;
} }
public async cancelTrialJob(trialJobId: string): Promise<void> { public async cancelTrialJob(trialJobId: string, isEarlyStopped: boolean = false): Promise<void> {
const trialJobDetail : KubeflowTrialJobDetail | undefined = this.trialJobsMap.get(trialJobId); const trialJobDetail : KubeflowTrialJobDetail | undefined = this.trialJobsMap.get(trialJobId);
if(!trialJobDetail) { if(!trialJobDetail) {
const errorMessage: string = `CancelTrialJob: trial job id ${trialJobId} not found`; const errorMessage: string = `CancelTrialJob: trial job id ${trialJobId} not found`;
...@@ -265,7 +265,7 @@ class KubeflowTrainingService implements TrainingService { ...@@ -265,7 +265,7 @@ class KubeflowTrainingService implements TrainingService {
} }
trialJobDetail.endTime = Date.now(); trialJobDetail.endTime = Date.now();
trialJobDetail.status = 'USER_CANCELED'; trialJobDetail.status = getJobCancelStatus(isEarlyStopped);
return Promise.resolve(); return Promise.resolve();
} }
...@@ -518,4 +518,4 @@ class KubeflowTrainingService implements TrainingService { ...@@ -518,4 +518,4 @@ class KubeflowTrainingService implements TrainingService {
} }
} }
export { KubeflowTrainingService } export { KubeflowTrainingService }
\ No newline at end of file
...@@ -35,7 +35,7 @@ import { ...@@ -35,7 +35,7 @@ import {
HostJobApplicationForm, JobApplicationForm, HyperParameters, TrainingService, TrialJobApplicationForm, HostJobApplicationForm, JobApplicationForm, HyperParameters, TrainingService, TrialJobApplicationForm,
TrialJobDetail, TrialJobMetric, TrialJobStatus TrialJobDetail, TrialJobMetric, TrialJobStatus
} from '../../common/trainingService'; } from '../../common/trainingService';
import { delay, generateParamFileName, getExperimentRootDir, uniqueString } from '../../common/utils'; import { delay, generateParamFileName, getExperimentRootDir, uniqueString, getJobCancelStatus } from '../../common/utils';
const tkill = require('tree-kill'); const tkill = require('tree-kill');
...@@ -246,7 +246,7 @@ class LocalTrainingService implements TrainingService { ...@@ -246,7 +246,7 @@ class LocalTrainingService implements TrainingService {
return true; return true;
} }
public async cancelTrialJob(trialJobId: string): Promise<void> { public async cancelTrialJob(trialJobId: string, isEarlyStopped: boolean = false): Promise<void> {
this.log.info(`cancelTrialJob: ${trialJobId}`); this.log.info(`cancelTrialJob: ${trialJobId}`);
const trialJob: LocalTrialJobDetail | undefined = this.jobMap.get(trialJobId); const trialJob: LocalTrialJobDetail | undefined = this.jobMap.get(trialJobId);
if (trialJob === undefined) { if (trialJob === undefined) {
...@@ -263,7 +263,7 @@ class LocalTrainingService implements TrainingService { ...@@ -263,7 +263,7 @@ class LocalTrainingService implements TrainingService {
} else { } else {
throw new Error(`Job type not supported: ${trialJob.form.jobType}`); throw new Error(`Job type not supported: ${trialJob.form.jobType}`);
} }
this.setTrialJobStatus(trialJob, 'USER_CANCELED'); this.setTrialJobStatus(trialJob, getJobCancelStatus(isEarlyStopped));
} }
public async setClusterMetadata(key: string, value: string): Promise<void> { public async setClusterMetadata(key: string, value: string): Promise<void> {
......
...@@ -39,7 +39,7 @@ export class PAIJobInfoCollector { ...@@ -39,7 +39,7 @@ export class PAIJobInfoCollector {
constructor(jobMap: Map<string, PAITrialJobDetail>) { constructor(jobMap: Map<string, PAITrialJobDetail>) {
this.trialJobsMap = jobMap; this.trialJobsMap = jobMap;
this.statusesNeedToCheck = ['RUNNING', 'UNKNOWN', 'WAITING']; this.statusesNeedToCheck = ['RUNNING', 'UNKNOWN', 'WAITING'];
this.finalStatuses = ['SUCCEEDED', 'FAILED', 'USER_CANCELED', 'SYS_CANCELED']; this.finalStatuses = ['SUCCEEDED', 'FAILED', 'USER_CANCELED', 'SYS_CANCELED', 'EARLY_STOPPED'];
} }
public async retrieveTrialStatus(paiToken? : string, paiClusterConfig?: PAIClusterConfig) : Promise<void> { public async retrieveTrialStatus(paiToken? : string, paiClusterConfig?: PAIClusterConfig) : Promise<void> {
...@@ -103,7 +103,9 @@ export class PAIJobInfoCollector { ...@@ -103,7 +103,9 @@ export class PAIJobInfoCollector {
paiTrialJob.status = 'SUCCEEDED'; paiTrialJob.status = 'SUCCEEDED';
break; break;
case 'STOPPED': case 'STOPPED':
paiTrialJob.status = 'USER_CANCELED'; if (paiTrialJob.status !== 'EARLY_STOPPED') {
paiTrialJob.status = 'USER_CANCELED';
}
break; break;
case 'FAILED': case 'FAILED':
paiTrialJob.status = 'FAILED'; paiTrialJob.status = 'FAILED';
...@@ -132,4 +134,4 @@ export class PAIJobInfoCollector { ...@@ -132,4 +134,4 @@ export class PAIJobInfoCollector {
return deferred.promise; return deferred.promise;
} }
} }
\ No newline at end of file
...@@ -282,7 +282,7 @@ class PAITrainingService implements TrainingService { ...@@ -282,7 +282,7 @@ class PAITrainingService implements TrainingService {
return false; return false;
} }
public cancelTrialJob(trialJobId: string): Promise<void> { public cancelTrialJob(trialJobId: string, isEarlyStopped: boolean = false): Promise<void> {
const trialJobDetail : PAITrialJobDetail | undefined = this.trialJobsMap.get(trialJobId); const trialJobDetail : PAITrialJobDetail | undefined = this.trialJobsMap.get(trialJobId);
const deferred : Deferred<void> = new Deferred<void>(); const deferred : Deferred<void> = new Deferred<void>();
if(!trialJobDetail) { if(!trialJobDetail) {
...@@ -312,6 +312,9 @@ class PAITrainingService implements TrainingService { ...@@ -312,6 +312,9 @@ class PAITrainingService implements TrainingService {
this.log.error(`PAI Training service: stop trial ${trialJobId} to PAI Cluster failed!`); this.log.error(`PAI Training service: stop trial ${trialJobId} to PAI Cluster failed!`);
deferred.reject(error ? error.message : 'Stop trial failed, http code: ' + response.statusCode); deferred.reject(error ? error.message : 'Stop trial failed, http code: ' + response.statusCode);
} else { } else {
if (isEarlyStopped) {
trialJobDetail.status = 'EARLY_STOPPED';
}
deferred.resolve(); deferred.resolve();
} }
}); });
...@@ -474,4 +477,4 @@ class PAITrainingService implements TrainingService { ...@@ -474,4 +477,4 @@ class PAITrainingService implements TrainingService {
} }
} }
export { PAITrainingService } export { PAITrainingService }
\ No newline at end of file
...@@ -61,7 +61,9 @@ export class MetricsCollector { ...@@ -61,7 +61,9 @@ export class MetricsCollector {
assert(trialJobDetail); assert(trialJobDetail);
// If job status is not alive again, remove its GPU reservation // If job status is not alive again, remove its GPU reservation
if(!['RUNNING'].includes(jobMetrics.jobStatus)) { if(!['RUNNING'].includes(jobMetrics.jobStatus)) {
trialJobDetail.status = jobMetrics.jobStatus; if (trialJobDetail.status !== 'EARLY_STOPPED') {
trialJobDetail.status = jobMetrics.jobStatus;
}
this.log.info(`Set trialjob ${trialJobDetail.id} status to ${trialJobDetail.status}`); this.log.info(`Set trialjob ${trialJobDetail.id} status to ${trialJobDetail.status}`);
runningJobsMap.forEach((jobIds: string[], rmMeta: RemoteMachineMeta) => { runningJobsMap.forEach((jobIds: string[], rmMeta: RemoteMachineMeta) => {
// If remote machine has no GPU, gpuReservcation is not initialized, so check if it's undefined // If remote machine has no GPU, gpuReservcation is not initialized, so check if it's undefined
......
...@@ -36,7 +36,7 @@ import { ObservableTimer } from '../../common/observableTimer'; ...@@ -36,7 +36,7 @@ import { ObservableTimer } from '../../common/observableTimer';
import { import {
HostJobApplicationForm, HyperParameters, JobApplicationForm, TrainingService, TrialJobApplicationForm, TrialJobDetail, TrialJobMetric HostJobApplicationForm, HyperParameters, JobApplicationForm, TrainingService, TrialJobApplicationForm, TrialJobDetail, TrialJobMetric
} from '../../common/trainingService'; } from '../../common/trainingService';
import { delay, generateParamFileName, getExperimentRootDir, uniqueString } from '../../common/utils'; import { delay, generateParamFileName, getExperimentRootDir, uniqueString, getJobCancelStatus } from '../../common/utils';
import { GPUSummary } from '../common/gpuData'; import { GPUSummary } from '../common/gpuData';
import { TrialConfig } from '../common/trialConfig'; import { TrialConfig } from '../common/trialConfig';
import { TrialConfigMetadataKey } from '../common/trialConfigMetadataKey'; import { TrialConfigMetadataKey } from '../common/trialConfigMetadataKey';
...@@ -234,7 +234,7 @@ class RemoteMachineTrainingService implements TrainingService { ...@@ -234,7 +234,7 @@ class RemoteMachineTrainingService implements TrainingService {
* Cancel trial job * Cancel trial job
* @param trialJobId ID of trial job * @param trialJobId ID of trial job
*/ */
public async cancelTrialJob(trialJobId: string): Promise<void> { public async cancelTrialJob(trialJobId: string, isEarlyStopped: boolean = false): Promise<void> {
this.log.info(`cancelTrialJob: jobId: ${trialJobId}`); this.log.info(`cancelTrialJob: jobId: ${trialJobId}`);
const deferred: Deferred<void> = new Deferred<void>(); const deferred: Deferred<void> = new Deferred<void>();
const trialJob: RemoteMachineTrialJobDetail | undefined = this.trialJobsMap.get(trialJobId); const trialJob: RemoteMachineTrialJobDetail | undefined = this.trialJobsMap.get(trialJobId);
...@@ -261,14 +261,15 @@ class RemoteMachineTrainingService implements TrainingService { ...@@ -261,14 +261,15 @@ class RemoteMachineTrainingService implements TrainingService {
const jobpidPath: string = this.getJobPidPath(trialJob.id); const jobpidPath: string = this.getJobPidPath(trialJob.id);
try { try {
await SSHClientUtility.remoteExeCommand(`pkill -P \`cat ${jobpidPath}\``, sshClient); await SSHClientUtility.remoteExeCommand(`pkill -P \`cat ${jobpidPath}\``, sshClient);
trialJob.status = 'USER_CANCELED'; trialJob.status = getJobCancelStatus(isEarlyStopped);
} catch (error) { } catch (error) {
// Not handle the error since pkill failed will not impact trial job's current status // Not handle the error since pkill failed will not impact trial job's current status
this.log.error(`remoteTrainingService.cancelTrialJob: ${error.message}`); this.log.error(`remoteTrainingService.cancelTrialJob: ${error.message}`);
} }
} else { } else {
// Job is not scheduled yet, set status to 'USER_CANCELLED' directly // Job is not scheduled yet, set status to 'USER_CANCELLED' directly
trialJob.status = 'USER_CANCELED'; assert(isEarlyStopped === false, 'isEarlyStopped is not supposed to be true here.');
trialJob.status = getJobCancelStatus(isEarlyStopped);
} }
} }
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment