Unverified Commit 252d35e0 authored by SparkSnail's avatar SparkSnail Committed by GitHub
Browse files

Support multi trial jobs on same GPU (#1109)

parent 0feed014
......@@ -399,6 +399,15 @@ machineList:
__gpuIndices__ is used to specify designated GPU devices for NNI, if it is set, only the specified GPU devices are used for NNI trial jobs. Single or multiple GPU indices can be specified, multiple GPU indices are seperated by comma(,), such as `1` or `0,1,3`.
* __maxTrialNumPerGpu__
__maxTrialNumPerGpu__ is used to specify the max concurrency trial number on a GPU device.
* __useActiveGpu__
__useActiveGpu__ is used to specify whether to use a GPU if there is another process. By default, NNI will use the GPU only if there is no another active process in the GPU, if __useActiveGpu__ is set to true, NNI will use the GPU regardless of another processes. This field is not applicable for NNI on Windows.
* __machineList__
__machineList__ should be set if __trainingServicePlatform__ is set to remote, or it should be empty.
......@@ -433,6 +442,14 @@ machineList:
__gpuIndices__ is used to specify designated GPU devices for NNI on this remote machine, if it is set, only the specified GPU devices are used for NNI trial jobs. Single or multiple GPU indices can be specified, multiple GPU indices are seperated by comma(,), such as `1` or `0,1,3`.
* __maxTrialNumPerGpu__
__maxTrialNumPerGpu__ is used to specify the max concurrency trial number on a GPU device.
* __useActiveGpu__
__useActiveGpu__ is used to specify whether to use a GPU if there is another process. By default, NNI will use the GPU only if there is no another active process in the GPU, if __useActiveGpu__ is set to true, NNI will use the GPU regardless of another processes. This field is not applicable for NNI on Windows.
* __kubeflowConfig__:
* __operator__
......
authorName: default
experimentName: example_pytorch_cifar10
trialConcurrency: 1
trialConcurrency: 4
maxExecDuration: 100h
maxTrialNum: 10
#choice: local, remote, pai
......@@ -19,3 +19,5 @@ trial:
command: python3 main.py
codeDir: .
gpuNum: 1
localConfig:
maxTrialNumPerGpu: 2
......@@ -31,10 +31,14 @@ export namespace ValidationSchemas {
passwd: joi.string(),
sshKeyPath: joi.string(),
passphrase: joi.string(),
gpuIndices: joi.string()
gpuIndices: joi.string(),
maxTrialNumPerGpu: joi.number(),
useActiveGpu: joi.boolean()
})),
local_config: joi.object({
gpuIndices: joi.string()
gpuIndices: joi.string(),
maxTrialNumPerGpu: joi.number(),
useActiveGpu: joi.boolean()
}),
trial_config: joi.object({
image: joi.string().min(1),
......
......@@ -71,14 +71,15 @@ class GPUScheduler {
execScript(gpuMetricsCollectorScriptPath)
}
public getAvailableGPUIndices(): number[] {
public getAvailableGPUIndices(useActiveGpu: boolean, occupiedGpuIndexNumMap: Map<number, number>): number[] {
if (this.gpuSummary !== undefined) {
if(process.platform === 'win32') {
if(process.platform === 'win32' || useActiveGpu) {
return this.gpuSummary.gpuInfos.map((info: GPUInfo) => info.index);
}
else{
return this.gpuSummary.gpuInfos.filter((info: GPUInfo) => info.activeProcessNum === 0)
.map((info: GPUInfo) => info.index);
return this.gpuSummary.gpuInfos.filter((info: GPUInfo) =>
occupiedGpuIndexNumMap.get(info.index) === undefined && info.activeProcessNum === 0 ||
occupiedGpuIndexNumMap.get(info.index) !== undefined).map((info: GPUInfo) => info.index);
}
}
......
......@@ -97,11 +97,19 @@ class LocalTrialJobDetail implements TrialJobDetail {
* Local training service config
*/
class LocalConfig {
public maxTrialNumPerGpu?: number;
public gpuIndices?: string;
constructor(gpuIndices?: string) {
public useActiveGpu?: boolean;
constructor(gpuIndices?: string, maxTrialNumPerGpu?: number, useActiveGpu?: boolean) {
if (gpuIndices !== undefined) {
this.gpuIndices = gpuIndices;
}
if (maxTrialNumPerGpu !== undefined) {
this.maxTrialNumPerGpu = maxTrialNumPerGpu;
}
if (useActiveGpu !== undefined) {
this.useActiveGpu = useActiveGpu;
}
}
}
......@@ -117,13 +125,15 @@ class LocalTrainingService implements TrainingService {
private rootDir!: string;
private trialSequenceId: number;
private gpuScheduler!: GPUScheduler;
private occupiedGpuIndices: Set<number>;
private occupiedGpuIndexNumMap: Map<number, number>;
private designatedGpuIndices!: Set<number>;
private log: Logger;
private localTrailConfig?: TrialConfig;
private localConfig?: LocalConfig;
private isMultiPhase: boolean = false;
private isMultiPhase: boolean;
private jobStreamMap: Map<string, ts.Stream>;
private maxTrialNumPerGpu: number;
private useActiveGpu: boolean;
constructor() {
this.eventEmitter = new EventEmitter();
......@@ -135,7 +145,10 @@ class LocalTrainingService implements TrainingService {
this.trialSequenceId = -1;
this.jobStreamMap = new Map<string, ts.Stream>();
this.log.info('Construct local machine training service.');
this.occupiedGpuIndices = new Set<number>();
this.occupiedGpuIndexNumMap = new Map<number, number>();
this.maxTrialNumPerGpu = 1;
this.useActiveGpu = false;
this.isMultiPhase = false;
}
public async run(): Promise<void> {
......@@ -304,6 +317,13 @@ class LocalTrainingService implements TrainingService {
throw new Error('gpuIndices can not be empty if specified.');
}
}
if (this.localConfig.maxTrialNumPerGpu !== undefined) {
this.maxTrialNumPerGpu = this.localConfig.maxTrialNumPerGpu;
}
if (this.localConfig.useActiveGpu !== undefined) {
this.useActiveGpu = this.localConfig.useActiveGpu;
}
break;
case TrialConfigMetadataKey.MULTI_PHASE:
this.isMultiPhase = (value === 'true' || value === 'True');
......@@ -356,7 +376,14 @@ class LocalTrainingService implements TrainingService {
if (trialJob.gpuIndices !== undefined && trialJob.gpuIndices.length > 0 && this.gpuScheduler !== undefined) {
if (oldStatus === 'RUNNING' && trialJob.status !== 'RUNNING') {
for (const index of trialJob.gpuIndices) {
this.occupiedGpuIndices.delete(index);
let num: number | undefined = this.occupiedGpuIndexNumMap.get(index);
if(num === undefined) {
throw new Error(`gpu resource schedule error`);
} else if(num === 1) {
this.occupiedGpuIndexNumMap.delete(index);
} else {
this.occupiedGpuIndexNumMap.set(index, num - 1)
}
}
}
}
......@@ -396,8 +423,14 @@ class LocalTrainingService implements TrainingService {
return [true, resource];
}
let selectedGPUIndices: number[] = this.gpuScheduler.getAvailableGPUIndices()
.filter((index: number) => !this.occupiedGpuIndices.has(index));
let selectedGPUIndices: number[] = [];
let availableGpuIndices: number[] = this.gpuScheduler.getAvailableGPUIndices(this.useActiveGpu, this.occupiedGpuIndexNumMap);
for(let index of availableGpuIndices) {
let num: number | undefined = this.occupiedGpuIndexNumMap.get(index);
if(num === undefined || num < this.maxTrialNumPerGpu) {
selectedGPUIndices.push(index);
}
}
if (this.designatedGpuIndices !== undefined) {
this.checkSpecifiedGpuIndices();
......@@ -428,7 +461,12 @@ class LocalTrainingService implements TrainingService {
private occupyResource(resource: {gpuIndices: number[]}): void {
if (this.gpuScheduler !== undefined) {
for (const index of resource.gpuIndices) {
this.occupiedGpuIndices.add(index);
let num: number | undefined = this.occupiedGpuIndexNumMap.get(index);
if(num === undefined) {
this.occupiedGpuIndexNumMap.set(index, 1)
} else {
this.occupiedGpuIndexNumMap.set(index, num + 1)
}
}
}
}
......
......@@ -23,7 +23,8 @@ import * as assert from 'assert';
import { getLogger, Logger } from '../../common/log';
import { randomSelect } from '../../common/utils';
import { GPUInfo } from '../common/gpuData';
import { parseGpuIndices, RemoteMachineMeta, RemoteMachineScheduleResult, ScheduleResultType, SSHClientManager } from './remoteMachineData';
import { RemoteMachineTrialJobDetail, parseGpuIndices, RemoteMachineMeta, RemoteMachineScheduleResult, ScheduleResultType, SSHClientManager } from './remoteMachineData';
import { TrialJobDetail } from 'common/trainingService';
/**
* A simple GPU scheduler implementation
......@@ -45,7 +46,7 @@ export class GPUScheduler {
* Schedule a machine according to the constraints (requiredGPUNum)
* @param requiredGPUNum required GPU number
*/
public scheduleMachine(requiredGPUNum: number, trialJobId : string) : RemoteMachineScheduleResult {
public scheduleMachine(requiredGPUNum: number, trialJobDetail : RemoteMachineTrialJobDetail) : RemoteMachineScheduleResult {
assert(requiredGPUNum >= 0);
const allRMs: RemoteMachineMeta[] = Array.from(this.machineSSHClientMap.keys());
assert(allRMs.length > 0);
......@@ -66,7 +67,7 @@ export class GPUScheduler {
// Currenty the requireGPUNum parameter for all trial jobs are identical.
if (requiredGPUNum > 0) {
// Trial job requires GPU
const result: RemoteMachineScheduleResult | undefined = this.scheduleGPUHost(requiredGPUNum, trialJobId);
const result: RemoteMachineScheduleResult | undefined = this.scheduleGPUHost(requiredGPUNum, trialJobDetail);
if (result !== undefined) {
return result;
}
......@@ -74,9 +75,9 @@ export class GPUScheduler {
// Trail job does not need GPU
const allocatedRm: RemoteMachineMeta = this.selectMachine(allRMs);
return this.allocateHost(requiredGPUNum, allocatedRm, [], trialJobId);
return this.allocateHost(requiredGPUNum, allocatedRm, [], trialJobDetail);
}
this.log.warning(`Scheduler: trialJob id ${trialJobId}, no machine can be scheduled, return TMP_NO_AVAILABLE_GPU `);
this.log.warning(`Scheduler: trialJob id ${trialJobDetail.id}, no machine can be scheduled, return TMP_NO_AVAILABLE_GPU `);
return {
resultType : ScheduleResultType.TMP_NO_AVAILABLE_GPU,
......@@ -87,21 +88,35 @@ export class GPUScheduler {
/**
* remove the job's gpu reversion
*/
public removeGpuReservation(trialJobId: string, rmMeta?: RemoteMachineMeta): void {
// If remote machine has no GPU, gpuReservcation is not initialized, so check if it's undefined
if (rmMeta !== undefined && rmMeta.gpuReservation !== undefined) {
rmMeta.gpuReservation.forEach((reserveTrialJobId : string, gpuIndex : number) => {
if (reserveTrialJobId === trialJobId) {
rmMeta.gpuReservation.delete(gpuIndex);
public removeGpuReservation(trialJobId: string, trialJobMap: Map<string, RemoteMachineTrialJobDetail>): void {
let trialJobDetail: RemoteMachineTrialJobDetail | undefined = trialJobMap.get(trialJobId);
if(trialJobDetail === undefined) {
throw new Error(`could not get trialJobDetail by id ${trialJobId}`);
}
if (trialJobDetail.rmMeta !== undefined &&
trialJobDetail.rmMeta.occupiedGpuIndexMap !== undefined &&
trialJobDetail.gpuIndices !== undefined &&
trialJobDetail.gpuIndices.length > 0) {
for (const gpuInfo of trialJobDetail.gpuIndices) {
let num: number | undefined = trialJobDetail.rmMeta.occupiedGpuIndexMap.get(gpuInfo.index);
if(num !== undefined) {
if(num === 1) {
trialJobDetail.rmMeta.occupiedGpuIndexMap.delete(gpuInfo.index);
} else {
trialJobDetail.rmMeta.occupiedGpuIndexMap.set(gpuInfo.index, num - 1)
}
}
});
}
}
trialJobDetail.gpuIndices = [];
trialJobMap.set(trialJobId, trialJobDetail);
}
private scheduleGPUHost(requiredGPUNum: number, trialJobId: string): RemoteMachineScheduleResult | undefined {
private scheduleGPUHost(requiredGPUNum: number, trialJobDetail: RemoteMachineTrialJobDetail): RemoteMachineScheduleResult | undefined {
const totalResourceMap: Map<RemoteMachineMeta, GPUInfo[]> = this.gpuResourceDetection();
const qualifiedRMs: RemoteMachineMeta[] = [];
totalResourceMap.forEach((gpuInfos: GPUInfo[], rmMeta: RemoteMachineMeta) => {
if (gpuInfos !== undefined && gpuInfos.length >= requiredGPUNum) {
qualifiedRMs.push(rmMeta);
}
......@@ -110,7 +125,7 @@ export class GPUScheduler {
const allocatedRm: RemoteMachineMeta = this.selectMachine(qualifiedRMs);
const gpuInfos: GPUInfo[] | undefined = totalResourceMap.get(allocatedRm);
if (gpuInfos !== undefined) { // should always true
return this.allocateHost(requiredGPUNum, allocatedRm, gpuInfos, trialJobId);
return this.allocateHost(requiredGPUNum, allocatedRm, gpuInfos, trialJobDetail);
} else {
assert(false, 'gpuInfos is undefined');
}
......@@ -130,9 +145,6 @@ export class GPUScheduler {
// Assgin totoal GPU count as init available GPU number
if (rmMeta.gpuSummary !== undefined) {
const availableGPUs: GPUInfo[] = [];
if (rmMeta.gpuReservation === undefined) {
rmMeta.gpuReservation = new Map<number, string>();
}
const designatedGpuIndices: Set<number> | undefined = parseGpuIndices(rmMeta.gpuIndices);
if (designatedGpuIndices !== undefined) {
for (const gpuIndex of designatedGpuIndices) {
......@@ -145,10 +157,20 @@ export class GPUScheduler {
rmMeta.gpuSummary.gpuInfos.forEach((gpuInfo: GPUInfo) => {
// if the GPU has active process, OR be reserved by a job,
// or index not in gpuIndices configuration in machineList,
// or trial number on a GPU reach max number,
// We should NOT allocate this GPU
if (gpuInfo.activeProcessNum === 0 && !rmMeta.gpuReservation.has(gpuInfo.index)
&& (designatedGpuIndices === undefined || designatedGpuIndices.has(gpuInfo.index))) {
availableGPUs.push(gpuInfo);
// if users set useActiveGpu, use the gpu whether there is another activeProcess
if (designatedGpuIndices === undefined || designatedGpuIndices.has(gpuInfo.index)) {
if(rmMeta.occupiedGpuIndexMap !== undefined) {
let num = rmMeta.occupiedGpuIndexMap.get(gpuInfo.index);
let maxTrialNumPerGpu: number = rmMeta.maxTrialNumPerGpu? rmMeta.maxTrialNumPerGpu: 1;
if((num === undefined && (!rmMeta.useActiveGpu && gpuInfo.activeProcessNum === 0 || rmMeta.useActiveGpu)) ||
(num !== undefined && num < maxTrialNumPerGpu)) {
availableGPUs.push(gpuInfo);
}
} else {
throw new Error(`occupiedGpuIndexMap initialize error!`);
}
}
});
totalResourceMap.set(rmMeta, availableGPUs);
......@@ -170,14 +192,22 @@ export class GPUScheduler {
}
private allocateHost(requiredGPUNum: number, rmMeta: RemoteMachineMeta,
gpuInfos: GPUInfo[], trialJobId: string): RemoteMachineScheduleResult {
gpuInfos: GPUInfo[], trialJobDetail: RemoteMachineTrialJobDetail): RemoteMachineScheduleResult {
assert(gpuInfos.length >= requiredGPUNum);
const allocatedGPUs: GPUInfo[] = this.selectGPUsForTrial(gpuInfos, requiredGPUNum);
allocatedGPUs.forEach((gpuInfo: GPUInfo) => {
rmMeta.gpuReservation.set(gpuInfo.index, trialJobId);
if(rmMeta.occupiedGpuIndexMap !== undefined) {
let num = rmMeta.occupiedGpuIndexMap.get(gpuInfo.index);
if(num === undefined) {
num = 0;
}
rmMeta.occupiedGpuIndexMap.set(gpuInfo.index, num + 1);
}else {
throw new Error(`Machine ${rmMeta.ip} occupiedGpuIndexMap initialize error!`);
}
});
trialJobDetail.gpuIndices = allocatedGPUs;
trialJobDetail.rmMeta = rmMeta;
return {
resultType: ScheduleResultType.SUCCEED,
scheduleInfo: {
......
......@@ -23,7 +23,7 @@ import * as fs from 'fs';
import { Client, ConnectConfig } from 'ssh2';
import { Deferred } from 'ts-deferred';
import { JobApplicationForm, TrialJobDetail, TrialJobStatus } from '../../common/trainingService';
import { GPUSummary } from '../common/gpuData';
import { GPUSummary, GPUInfo } from '../common/gpuData';
/**
* Metadata of remote machine for configuration and statuc query
......@@ -36,20 +36,23 @@ export class RemoteMachineMeta {
public readonly sshKeyPath?: string;
public readonly passphrase?: string;
public gpuSummary : GPUSummary | undefined;
// GPU Reservation info, the key is GPU index, the value is the job id which reserves this GPU
public gpuReservation : Map<number, string>;
public readonly gpuIndices?: string;
public readonly maxTrialNumPerGpu?: number;
public occupiedGpuIndexMap: Map<number, number>;
public readonly useActiveGpu?: boolean = false;
constructor(ip : string, port : number, username : string, passwd : string,
sshKeyPath: string, passphrase : string, gpuIndices?: string) {
sshKeyPath: string, passphrase : string, gpuIndices?: string, maxTrialNumPerGpu?: number, useActiveGpu?: boolean) {
this.ip = ip;
this.port = port;
this.username = username;
this.passwd = passwd;
this.sshKeyPath = sshKeyPath;
this.passphrase = passphrase;
this.gpuReservation = new Map<number, string>();
this.gpuIndices = gpuIndices;
this.maxTrialNumPerGpu = maxTrialNumPerGpu;
this.occupiedGpuIndexMap = new Map<number, number>();
this.useActiveGpu = useActiveGpu;
}
}
......@@ -97,6 +100,7 @@ export class RemoteMachineTrialJobDetail implements TrialJobDetail {
public sequenceId: number;
public rmMeta?: RemoteMachineMeta;
public isEarlyStopped?: boolean;
public gpuIndices: GPUInfo[];
constructor(id: string, status: TrialJobStatus, submitTime: number,
workingDirectory: string, form: JobApplicationForm, sequenceId: number) {
......@@ -107,6 +111,7 @@ export class RemoteMachineTrialJobDetail implements TrialJobDetail {
this.form = form;
this.sequenceId = sequenceId;
this.tags = [];
this.gpuIndices = []
}
}
......
......@@ -282,7 +282,7 @@ class RemoteMachineTrainingService implements TrainingService {
private updateGpuReservation() {
for (const [key, value] of this.trialJobsMap) {
if(!['WAITING', 'RUNNING'].includes(value.status)) {
this.gpuScheduler.removeGpuReservation(value.id, value.rmMeta);
this.gpuScheduler.removeGpuReservation(key, this.trialJobsMap);
}
};
}
......@@ -521,7 +521,7 @@ class RemoteMachineTrainingService implements TrainingService {
return deferred.promise;
}
// get an ssh client from scheduler
const rmScheduleResult: RemoteMachineScheduleResult = this.gpuScheduler.scheduleMachine(this.trialConfig.gpuNum, trialJobId);
const rmScheduleResult: RemoteMachineScheduleResult = this.gpuScheduler.scheduleMachine(this.trialConfig.gpuNum, trialJobDetail);
if (rmScheduleResult.resultType === ScheduleResultType.REQUIRE_EXCEED_TOTAL) {
const errorMessage : string = `Required GPU number ${this.trialConfig.gpuNum} is too large, no machine can meet`;
this.log.error(errorMessage);
......@@ -542,6 +542,7 @@ class RemoteMachineTrainingService implements TrainingService {
trialJobDetail.url = `file://${rmScheduleInfo.rmMeta.ip}:${trialWorkingFolder}`;
trialJobDetail.startTime = Date.now();
this.trialJobsMap.set(trialJobId, trialJobDetail);
deferred.resolve(true);
} else if (rmScheduleResult.resultType === ScheduleResultType.TMP_NO_AVAILABLE_GPU) {
this.log.info(`Right now no available GPU can be allocated for trial ${trialJobId}, will try to schedule later`);
......
......@@ -63,7 +63,9 @@ common_schema = {
Optional('advisor'): dict,
Optional('assessor'): dict,
Optional('localConfig'): {
Optional('gpuIndices'): Or(int, And(str, lambda x: len([int(i) for i in x.split(',')]) > 0), error='gpuIndex format error!')
Optional('gpuIndices'): Or(int, And(str, lambda x: len([int(i) for i in x.split(',')]) > 0), error='gpuIndex format error!'),
Optional('maxTrialNumPerGpu'): setType('maxTrialNumPerGpu', int),
Optional('useActiveGpu'): setType('useActiveGpu', bool)
}
}
tuner_schema_dict = {
......@@ -310,26 +312,30 @@ frameworkcontroller_config_schema = {
})
}
machine_list_schima = {
machine_list_schema = {
Optional('machineList'):[Or({
'ip': setType('ip', str),
Optional('port'): setNumberRange('port', int, 1, 65535),
'username': setType('username', str),
'passwd': setType('passwd', str),
Optional('gpuIndices'): Or(int, And(str, lambda x: len([int(i) for i in x.split(',')]) > 0), error='gpuIndex format error!')
Optional('gpuIndices'): Or(int, And(str, lambda x: len([int(i) for i in x.split(',')]) > 0), error='gpuIndex format error!'),
Optional('maxTrialNumPerGpu'): setType('maxTrialNumPerGpu', int),
Optional('useActiveGpu'): setType('useActiveGpu', bool)
},{
'ip': setType('ip', str),
Optional('port'): setNumberRange('port', int, 1, 65535),
'username': setType('username', str),
'sshKeyPath': setPathCheck('sshKeyPath'),
Optional('passphrase'): setType('passphrase', str),
Optional('gpuIndices'): Or(int, And(str, lambda x: len([int(i) for i in x.split(',')]) > 0), error='gpuIndex format error!')
Optional('gpuIndices'): Or(int, And(str, lambda x: len([int(i) for i in x.split(',')]) > 0), error='gpuIndex format error!'),
Optional('maxTrialNumPerGpu'): setType('maxTrialNumPerGpu', int),
Optional('useActiveGpu'): setType('useActiveGpu', bool)
})]
}
LOCAL_CONFIG_SCHEMA = Schema({**common_schema, **common_trial_schema})
REMOTE_CONFIG_SCHEMA = Schema({**common_schema, **common_trial_schema, **machine_list_schima})
REMOTE_CONFIG_SCHEMA = Schema({**common_schema, **common_trial_schema, **machine_list_schema})
PAI_CONFIG_SCHEMA = Schema({**common_schema, **pai_trial_schema, **pai_config_schema})
......
......@@ -160,9 +160,13 @@ def set_local_config(experiment_config, port, config_file_name):
request_data = dict()
if experiment_config.get('localConfig'):
request_data['local_config'] = experiment_config['localConfig']
if request_data['local_config'] and request_data['local_config'].get('gpuIndices') \
and isinstance(request_data['local_config'].get('gpuIndices'), int):
request_data['local_config']['gpuIndices'] = str(request_data['local_config'].get('gpuIndices'))
if request_data['local_config']:
if request_data['local_config'].get('gpuIndices') and isinstance(request_data['local_config'].get('gpuIndices'), int):
request_data['local_config']['gpuIndices'] = str(request_data['local_config'].get('gpuIndices'))
if request_data['local_config'].get('maxTrialNumOnEachGpu'):
request_data['local_config']['maxTrialNumOnEachGpu'] = request_data['local_config'].get('maxTrialNumOnEachGpu')
if request_data['local_config'].get('useActiveGpu'):
request_data['local_config']['useActiveGpu'] = request_data['local_config'].get('useActiveGpu')
response = rest_put(cluster_metadata_url(port), json.dumps(request_data), REST_TIME_OUT)
err_message = ''
if not response or not check_response(response):
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment