Unverified Commit 3bce6926 authored by Zhenhua Han's avatar Zhenhua Han Committed by GitHub
Browse files

extend reusable training service to support placement constraint (#3897)

parent b0de7c93
from dataclasses import dataclass
from typing import Literal
@dataclass
class GPUDevice:
node_id: str
gpu_id: int
status: Literal['idle', 'busy', 'unknown'] = 'idle'
def __eq__(self, o) -> bool:
return self.node_id == o.node_id and self.gpu_id == o.gpu_id
def __lt__(self, o) -> bool:
if self.node_id < o.node_id:
return True
elif self.node_id > o.node_id:
return False
else:
return self.gpu_id < o.gpu_id
def __repr__(self) -> str:
return "{Environment %s, GPU %d, Status %s}" % (self.node_id, self.gpu_id, self.status)
def __hash__(self) -> int:
return hash(self.node_id + '_' + self.gpu_id)
def set_status(self, status):
self.status = status
...@@ -12,6 +12,7 @@ __all__ = ['LocalConfig'] ...@@ -12,6 +12,7 @@ __all__ = ['LocalConfig']
@dataclass(init=False) @dataclass(init=False)
class LocalConfig(TrainingServiceConfig): class LocalConfig(TrainingServiceConfig):
platform: str = 'local' platform: str = 'local'
reuse_mode: bool = False
use_active_gpu: Optional[bool] = None use_active_gpu: Optional[bool] = None
max_trial_number_per_gpu: int = 1 max_trial_number_per_gpu: int = 1
gpu_indices: Union[List[int], str, int, None] = None gpu_indices: Union[List[int], str, int, None] = None
......
...@@ -14,6 +14,7 @@ from ..integration_api import send_trial, receive_trial_parameters, get_advisor ...@@ -14,6 +14,7 @@ from ..integration_api import send_trial, receive_trial_parameters, get_advisor
_logger = logging.getLogger(__name__) _logger = logging.getLogger(__name__)
class BaseGraphData: class BaseGraphData:
def __init__(self, model_script: str, evaluator: Evaluator) -> None: def __init__(self, model_script: str, evaluator: Evaluator) -> None:
self.model_script = model_script self.model_script = model_script
......
...@@ -17,7 +17,7 @@ _logger = logging.getLogger(__name__) ...@@ -17,7 +17,7 @@ _logger = logging.getLogger(__name__)
class CGOExecutionEngine(AbstractExecutionEngine): class CGOExecutionEngine(AbstractExecutionEngine):
def __init__(self, n_model_per_graph=4) -> None: def __init__(self, devices=None, n_model_per_graph=4) -> None:
self._listeners: List[AbstractGraphListener] = [] self._listeners: List[AbstractGraphListener] = []
self._running_models: Dict[int, Model] = dict() self._running_models: Dict[int, Model] = dict()
self.logical_plan_counter = 0 self.logical_plan_counter = 0
...@@ -25,6 +25,7 @@ class CGOExecutionEngine(AbstractExecutionEngine): ...@@ -25,6 +25,7 @@ class CGOExecutionEngine(AbstractExecutionEngine):
self._optimizers = [DedupInputOptimizer()] self._optimizers = [DedupInputOptimizer()]
self._original_models = {} self._original_models = {}
self._original_model_to_multi_model = {} self._original_model_to_multi_model = {}
self.devices = [] if devices is None else devices
# register advisor callbacks # register advisor callbacks
advisor = get_advisor() advisor = get_advisor()
......
...@@ -25,6 +25,7 @@ from nni.experiment.config import util ...@@ -25,6 +25,7 @@ from nni.experiment.config import util
from nni.experiment.config.base import ConfigBase, PathLike from nni.experiment.config.base import ConfigBase, PathLike
from nni.experiment.pipe import Pipe from nni.experiment.pipe import Pipe
from nni.tools.nnictl.command_utils import kill_command from nni.tools.nnictl.command_utils import kill_command
from nni.common.device import GPUDevice
from ..codegen import model_to_pytorch_script from ..codegen import model_to_pytorch_script
from ..converter import convert_to_graph from ..converter import convert_to_graph
...@@ -193,13 +194,14 @@ class RetiariiExperiment(Experiment): ...@@ -193,13 +194,14 @@ class RetiariiExperiment(Experiment):
""" """
atexit.register(self.stop) atexit.register(self.stop)
devices = self._construct_devices()
# we will probably need a execution engine factory to make this clean and elegant # we will probably need a execution engine factory to make this clean and elegant
if self.config.execution_engine == 'base': if self.config.execution_engine == 'base':
from ..execution.base import BaseExecutionEngine from ..execution.base import BaseExecutionEngine
engine = BaseExecutionEngine() engine = BaseExecutionEngine()
elif self.config.execution_engine == 'cgo': elif self.config.execution_engine == 'cgo':
from ..execution.cgo_engine import CGOExecutionEngine from ..execution.cgo_engine import CGOExecutionEngine
engine = CGOExecutionEngine() engine = CGOExecutionEngine(devices = devices)
elif self.config.execution_engine == 'py': elif self.config.execution_engine == 'py':
from ..execution.python import PurePythonExecutionEngine from ..execution.python import PurePythonExecutionEngine
engine = PurePythonExecutionEngine() engine = PurePythonExecutionEngine()
...@@ -241,6 +243,17 @@ class RetiariiExperiment(Experiment): ...@@ -241,6 +243,17 @@ class RetiariiExperiment(Experiment):
_logger.info('Waiting for experiment to become DONE (you can ctrl+c if there is no running trial jobs)...') _logger.info('Waiting for experiment to become DONE (you can ctrl+c if there is no running trial jobs)...')
exp_status_checker.join() exp_status_checker.join()
def _construct_devices(self):
devices = []
if hasattr(self.config.training_service, 'machine_list'):
for machine_idx, machine in enumerate(self.config.training_service.machine_list):
for gpu_idx in machine.gpu_indices:
devices.append(GPUDevice(machine.host, gpu_idx))
else:
for gpu_idx in self.config.training_service.gpu_indices:
devices.append(GPUDevice('local', gpu_idx))
return devices
def _create_dispatcher(self): def _create_dispatcher(self):
return self._dispatcher return self._dispatcher
......
...@@ -68,7 +68,27 @@ class RetiariiAdvisor(MsgDispatcherBase): ...@@ -68,7 +68,27 @@ class RetiariiAdvisor(MsgDispatcherBase):
self.handle_update_search_space(data) self.handle_update_search_space(data)
send(CommandType.Initialized, '') send(CommandType.Initialized, '')
def send_trial(self, parameters): def _validate_placement_constraint(self, placement_constraint):
if placement_constraint is None:
raise ValueError('placement_constraint is None')
if not 'type' in placement_constraint:
raise ValueError('placement_constraint must have `type`')
if not 'gpus' in placement_constraint:
raise ValueError('placement_constraint must have `gpus`')
if placement_constraint['type'] not in ['None', 'GPUNumber', 'Device']:
raise ValueError('placement_constraint.type must be either `None`,. `GPUNumber` or `Device`')
if placement_constraint['type'] == 'None' and len(placement_constraint['gpus']) > 0:
raise ValueError('placement_constraint.gpus must be an empty list when type == None')
if placement_constraint['type'] == 'Device' and len(placement_constraint['gpus']) != 1:
raise ValueError('placement_constraint.gpus must be a list of number (currently only support one host)')
if placement_constraint['type'] == 'Device':
for e in placement_constraint['gpus']:
if not isinstance(e, tuple):
raise ValueError('placement_constraint.gpus must be a list of tuple when type == Device')
if not (len(e) == 2 and isinstance(e[0], str) and isinstance(e[1], int)):
raise ValueError('placement_constraint.gpus`s tuple must be (str, int)')
def send_trial(self, parameters, placement_constraint=None):
""" """
Send parameters to NNI. Send parameters to NNI.
...@@ -84,10 +104,17 @@ class RetiariiAdvisor(MsgDispatcherBase): ...@@ -84,10 +104,17 @@ class RetiariiAdvisor(MsgDispatcherBase):
which will be used for identification in future. which will be used for identification in future.
""" """
self.parameters_count += 1 self.parameters_count += 1
if placement_constraint is None:
placement_constraint = {
'type': 'None',
'gpus': []
}
self._validate_placement_constraint(placement_constraint)
new_trial = { new_trial = {
'parameter_id': self.parameters_count, 'parameter_id': self.parameters_count,
'parameters': parameters, 'parameters': parameters,
'parameter_source': 'algorithm' 'parameter_source': 'algorithm',
'placement_constraint': placement_constraint
} }
_logger.debug('New trial sent: %s', new_trial) _logger.debug('New trial sent: %s', new_trial)
send(CommandType.NewTrialJob, json_dumps(new_trial)) send(CommandType.NewTrialJob, json_dumps(new_trial))
......
...@@ -27,12 +27,12 @@ def register_advisor(advisor: 'RetiariiAdvisor'): ...@@ -27,12 +27,12 @@ def register_advisor(advisor: 'RetiariiAdvisor'):
_advisor = advisor _advisor = advisor
def send_trial(parameters: dict) -> int: def send_trial(parameters: dict, placement_constraint=None) -> int:
""" """
Send a new trial. Executed on tuner end. Send a new trial. Executed on tuner end.
Return a ID that is the unique identifier for this trial. Return a ID that is the unique identifier for this trial.
""" """
return get_advisor().send_trial(parameters) return get_advisor().send_trial(parameters, placement_constraint)
def receive_trial_parameters() -> dict: def receive_trial_parameters() -> dict:
......
...@@ -242,4 +242,4 @@ class MsgDispatcherBase(Recoverable): ...@@ -242,4 +242,4 @@ class MsgDispatcherBase(Recoverable):
hyper_params: the string that is sent by message dispatcher during the creation of trials. hyper_params: the string that is sent by message dispatcher during the creation of trials.
""" """
raise NotImplementedError('handle_trial_end not implemented') raise NotImplementedError('handle_trial_end not implemented')
\ No newline at end of file
lightning_logs
data
\ No newline at end of file
...@@ -45,4 +45,4 @@ if __name__ == '__main__': ...@@ -45,4 +45,4 @@ if __name__ == '__main__':
exp_config.training_service.use_active_gpu = True exp_config.training_service.use_active_gpu = True
exp_config.training_service.gpu_indices = [1, 2] exp_config.training_service.gpu_indices = [1, 2]
exp.run(exp_config, 8081) exp.run(exp_config, 8081)
\ No newline at end of file
import json
from nni.common.device import GPUDevice
import os
import sys
import torch
from pathlib import Path
import nni.retiarii.evaluator.pytorch.lightning as pl
import nni.retiarii.strategy as strategy
from nni.experiment import RemoteMachineConfig
from nni.retiarii import serialize
from nni.retiarii.experiment.pytorch import RetiariiExperiment, RetiariiExeConfig
from torchvision import transforms
from torchvision.datasets import CIFAR10
from darts_model import CNN
if __name__ == '__main__':
base_model = CNN(32, 3, 16, 10, 8)
train_transform = transforms.Compose([
transforms.RandomCrop(32, padding=4),
transforms.RandomHorizontalFlip(),
transforms.ToTensor(),
transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)),
])
valid_transform = transforms.Compose([
transforms.ToTensor(),
transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)),
])
train_dataset = serialize(CIFAR10, root='data/cifar10', train=True, download=True, transform=train_transform)
test_dataset = serialize(CIFAR10, root='data/cifar10', train=False, download=True, transform=valid_transform)
trainer = pl.Classification(train_dataloader=pl.DataLoader(train_dataset, batch_size=100),
val_dataloaders=pl.DataLoader(test_dataset, batch_size=100),
max_epochs=1, limit_train_batches=0.2)
simple_strategy = strategy.Random()
exp = RetiariiExperiment(base_model, trainer, [], simple_strategy)
exp_config = RetiariiExeConfig('remote')
exp_config.experiment_name = 'darts_search'
exp_config.trial_concurrency = 2
exp_config.max_trial_number = 10
exp_config.trial_gpu_number = 1
exp_config.training_service.use_active_gpu = True
exp_config.training_service.reuse_mode = True
exp_config.training_service.gpu_indices = [0, 1, 2]
rm_conf = RemoteMachineConfig()
rm_conf.host = '127.0.0.1'
rm_conf.user = 'xxx'
rm_conf.password = 'xxx'
rm_conf.port = 22
rm_conf.python_path = '/home/xxx/py38/bin'
rm_conf.gpu_indices = [0, 1, 2]
rm_conf.use_active_gpu = True
rm_conf.max_trial_number_per_gpu = 3
exp_config.training_service.machine_list = [rm_conf]
exp_config.execution_engine = 'py'
exp.run(exp_config, 8081)
...@@ -16,6 +16,7 @@ export interface TrainingServiceConfig { ...@@ -16,6 +16,7 @@ export interface TrainingServiceConfig {
export interface LocalConfig extends TrainingServiceConfig { export interface LocalConfig extends TrainingServiceConfig {
platform: 'local'; platform: 'local';
reuseMode: boolean;
useActiveGpu?: boolean; useActiveGpu?: boolean;
maxTrialNumberPerGpu: number; maxTrialNumberPerGpu: number;
gpuIndices?: number[]; gpuIndices?: number[];
......
...@@ -18,12 +18,31 @@ interface HyperParameters { ...@@ -18,12 +18,31 @@ interface HyperParameters {
readonly index: number; readonly index: number;
} }
type PlacementConstraintType = 'None' | 'GPUNumber' | 'Device'
interface PlacementConstraint{
readonly type: PlacementConstraintType;
readonly gpus: Array<number> | Array<[string,number]>;
/**
* GPUNumber constraint is in form of Array<number>, e.g., [3] means it must be placed on a node of 3 GPUs
*
* Device constraint is in form of Array<[string,number]>, e.g., [('Node-0',1),('Node-1',0)] means it must be placed on
* Node-0's GPU-1 and Node-1's GPU-0
*/
}
/** /**
* define TrialJobApplicationForm * define TrialJobApplicationForm
*/ */
interface TrialJobApplicationForm { interface TrialJobApplicationForm {
readonly sequenceId: number; readonly sequenceId: number;
readonly hyperParameters: HyperParameters; readonly hyperParameters: HyperParameters;
readonly placementConstraint?: PlacementConstraint;
}
interface TrialCommandContent {
readonly parameter_id: string;
readonly parameters: string;
readonly parameter_source: string;
readonly placement_constraint?: PlacementConstraint;
} }
/** /**
...@@ -101,5 +120,5 @@ class NNIManagerIpConfig { ...@@ -101,5 +120,5 @@ class NNIManagerIpConfig {
export { export {
TrainingService, TrainingServiceError, TrialJobStatus, TrialJobApplicationForm, TrainingService, TrainingServiceError, TrialJobStatus, TrialJobApplicationForm,
TrainingServiceMetadata, TrialJobDetail, TrialJobMetric, HyperParameters, TrainingServiceMetadata, TrialJobDetail, TrialJobMetric, HyperParameters,
NNIManagerIpConfig NNIManagerIpConfig, PlacementConstraint, TrialCommandContent
}; };
...@@ -19,7 +19,7 @@ import { ExperimentConfig, toSeconds, toCudaVisibleDevices } from '../common/exp ...@@ -19,7 +19,7 @@ import { ExperimentConfig, toSeconds, toCudaVisibleDevices } from '../common/exp
import { ExperimentManager } from '../common/experimentManager'; import { ExperimentManager } from '../common/experimentManager';
import { TensorboardManager } from '../common/tensorboardManager'; import { TensorboardManager } from '../common/tensorboardManager';
import { import {
TrainingService, TrialJobApplicationForm, TrialJobDetail, TrialJobMetric, TrialJobStatus TrainingService, TrialJobApplicationForm, TrialJobDetail, TrialJobMetric, TrialJobStatus, TrialCommandContent
} from '../common/trainingService'; } from '../common/trainingService';
import { delay, getCheckpointDir, getExperimentRootDir, getLogDir, getMsgDispatcherCommand, mkDirP, getTunerProc, getLogLevel, isAlive, killPid } from '../common/utils'; import { delay, getCheckpointDir, getExperimentRootDir, getLogDir, getMsgDispatcherCommand, mkDirP, getTunerProc, getLogLevel, isAlive, killPid } from '../common/utils';
import { import {
...@@ -178,7 +178,7 @@ class NNIManager implements Manager { ...@@ -178,7 +178,7 @@ class NNIManager implements Manager {
this.config = config; this.config = config;
this.log.info(`Starting experiment: ${this.experimentProfile.id}`); this.log.info(`Starting experiment: ${this.experimentProfile.id}`);
await this.storeExperimentProfile(); await this.storeExperimentProfile();
if (this.trainingService === undefined) { if (this.trainingService === undefined) {
this.log.info('Setup training service...'); this.log.info('Setup training service...');
this.trainingService = await this.initTrainingService(config); this.trainingService = await this.initTrainingService(config);
...@@ -449,8 +449,12 @@ class NNIManager implements Manager { ...@@ -449,8 +449,12 @@ class NNIManager implements Manager {
if (!platform) { if (!platform) {
throw new Error('Cannot detect training service platform'); throw new Error('Cannot detect training service platform');
} }
const reuseMode = Array.isArray(config.trainingService) || (config.trainingService as any).reuseMode;
if (platform === 'local') { if (reuseMode) {
const module_ = await import('../training_service/reusable/routerTrainingService');
return await module_.RouterTrainingService.construct(config);
} else if (platform === 'local') {
const module_ = await import('../training_service/local/localTrainingService'); const module_ = await import('../training_service/local/localTrainingService');
return new module_.LocalTrainingService(config); return new module_.LocalTrainingService(config);
} else if (platform === 'frameworkcontroller') { } else if (platform === 'frameworkcontroller') {
...@@ -542,13 +546,13 @@ class NNIManager implements Manager { ...@@ -542,13 +546,13 @@ class NNIManager implements Manager {
private async stopTrialJobIfOverMaxDurationTimer(trialJobId: string): Promise<void> { private async stopTrialJobIfOverMaxDurationTimer(trialJobId: string): Promise<void> {
const trialJobDetail: TrialJobDetail | undefined = this.trialJobs.get(trialJobId); const trialJobDetail: TrialJobDetail | undefined = this.trialJobs.get(trialJobId);
if(undefined !== trialJobDetail && if (undefined !== trialJobDetail &&
trialJobDetail.status === 'RUNNING' && trialJobDetail.status === 'RUNNING' &&
trialJobDetail.startTime !== undefined){ trialJobDetail.startTime !== undefined) {
const isEarlyStopped = true; const isEarlyStopped = true;
await this.trainingService.cancelTrialJob(trialJobId, isEarlyStopped); await this.trainingService.cancelTrialJob(trialJobId, isEarlyStopped);
this.log.info(`Trial job ${trialJobId} has stoped because it is over maxTrialDuration.`); this.log.info(`Trial job ${trialJobId} has stoped because it is over maxTrialDuration.`);
} }
} }
private async requestTrialJobsStatus(): Promise<number> { private async requestTrialJobsStatus(): Promise<number> {
...@@ -674,7 +678,7 @@ class NNIManager implements Manager { ...@@ -674,7 +678,7 @@ class NNIManager implements Manager {
this.currSubmittedTrialNum++; this.currSubmittedTrialNum++;
this.log.info('submitTrialJob: form:', form); this.log.info('submitTrialJob: form:', form);
const trialJobDetail: TrialJobDetail = await this.trainingService.submitTrialJob(form); const trialJobDetail: TrialJobDetail = await this.trainingService.submitTrialJob(form);
setTimeout(async ()=> this.stopTrialJobIfOverMaxDurationTimer(trialJobDetail.id), 1000 * this.maxTrialDuration); setTimeout(async () => this.stopTrialJobIfOverMaxDurationTimer(trialJobDetail.id), 1000 * this.maxTrialDuration);
const Snapshot: TrialJobDetail = Object.assign({}, trialJobDetail); const Snapshot: TrialJobDetail = Object.assign({}, trialJobDetail);
await this.storeExperimentProfile(); await this.storeExperimentProfile();
this.trialJobs.set(trialJobDetail.id, Snapshot); this.trialJobs.set(trialJobDetail.id, Snapshot);
...@@ -747,7 +751,7 @@ class NNIManager implements Manager { ...@@ -747,7 +751,7 @@ class NNIManager implements Manager {
private async onTrialJobMetrics(metric: TrialJobMetric): Promise<void> { private async onTrialJobMetrics(metric: TrialJobMetric): Promise<void> {
this.log.debug('NNIManager received trial job metrics:', metric); this.log.debug('NNIManager received trial job metrics:', metric);
if (this.trialJobs.has(metric.id)){ if (this.trialJobs.has(metric.id)) {
await this.dataStore.storeMetricData(metric.id, metric.data); await this.dataStore.storeMetricData(metric.id, metric.data);
if (this.dispatcher === undefined) { if (this.dispatcher === undefined) {
throw new Error('Error: tuner has not been setup'); throw new Error('Error: tuner has not been setup');
...@@ -796,12 +800,14 @@ class NNIManager implements Manager { ...@@ -796,12 +800,14 @@ class NNIManager implements Manager {
this.log.warning('It is not supposed to receive more trials after NO_MORE_TRIAL is set'); this.log.warning('It is not supposed to receive more trials after NO_MORE_TRIAL is set');
this.setStatus('RUNNING'); this.setStatus('RUNNING');
} }
const trialRequestContent: TrialCommandContent = JSON.parse(content);
const form: TrialJobApplicationForm = { const form: TrialJobApplicationForm = {
sequenceId: this.experimentProfile.nextSequenceId++, sequenceId: this.experimentProfile.nextSequenceId++,
hyperParameters: { hyperParameters: {
value: content, value: content,
index: 0 index: 0
} },
placementConstraint: trialRequestContent.placement_constraint
}; };
this.waitingTrials.push(form); this.waitingTrials.push(form);
break; break;
......
...@@ -125,6 +125,10 @@ describe('Unit Test for RemoteMachineTrainingService', () => { ...@@ -125,6 +125,10 @@ describe('Unit Test for RemoteMachineTrainingService', () => {
hyperParameters: { hyperParameters: {
value: 'mock hyperparameters', value: 'mock hyperparameters',
index: 0 index: 0
},
placementConstraint: {
type: "None",
gpus: []
} }
}; };
const jobDetail: TrialJobDetail = await remoteMachineTrainingService.submitTrialJob(form); const jobDetail: TrialJobDetail = await remoteMachineTrainingService.submitTrialJob(form);
......
...@@ -248,6 +248,7 @@ export class RemoteEnvironmentService extends EnvironmentService { ...@@ -248,6 +248,7 @@ export class RemoteEnvironmentService extends EnvironmentService {
} }
environment.command = await this.getScript(environment); environment.command = await this.getScript(environment);
environment.useActiveGpu = rmMachineMeta.useActiveGpu;
return Promise.resolve(true); return Promise.resolve(true);
} }
} }
......
...@@ -4,10 +4,12 @@ ...@@ -4,10 +4,12 @@
'use strict'; 'use strict';
import * as assert from 'assert'; import * as assert from 'assert';
import { PlacementConstraint } from 'common/trainingService';
import { getLogger, Logger } from '../../common/log'; import { getLogger, Logger } from '../../common/log';
import { randomSelect } from '../../common/utils'; import { randomSelect } from '../../common/utils';
import { GPUInfo, ScheduleResultType } from '../common/gpuData'; import { GPUInfo, ScheduleResultType } from '../common/gpuData';
import { EnvironmentInformation } from './environment'; import { EnvironmentInformation } from './environment';
import { RemoteMachineEnvironmentInformation } from './remote/remoteConfig';
import { TrialDetail } from './trial'; import { TrialDetail } from './trial';
type SCHEDULE_POLICY_NAME = 'random' | 'round-robin' | 'recently-idle'; type SCHEDULE_POLICY_NAME = 'random' | 'round-robin' | 'recently-idle';
...@@ -51,46 +53,143 @@ export class GpuScheduler { ...@@ -51,46 +53,143 @@ export class GpuScheduler {
/** /**
* Schedule a machine according to the constraints (requiredGPUNum) * Schedule a machine according to the constraints (requiredGPUNum)
* @param requiredGPUNum required GPU number * @param defaultRequiredGPUNum the default required GPU number when constraint.type === 'None'
*/ */
public scheduleMachine(environments: EnvironmentInformation[], requiredGPUNum: number | undefined, trialDetail: TrialDetail): GpuScheduleResult { public scheduleMachine(environments: EnvironmentInformation[], constraint: PlacementConstraint,
if (requiredGPUNum === undefined) { defaultRequiredGPUNum: number | undefined, trialDetail: TrialDetail): GpuScheduleResult {
requiredGPUNum = 0; if (constraint.type == 'None' || constraint.type == 'GPUNumber') {
} let requiredGPUNum = 0;
assert(requiredGPUNum >= 0); if (constraint.type == 'None') {
// Step 1: Check if required GPU number not exceeds the total GPU number in all machines if (defaultRequiredGPUNum === undefined) {
const eligibleEnvironments: EnvironmentInformation[] = environments.filter((environment: EnvironmentInformation) => requiredGPUNum = 0;
environment.defaultGpuSummary === undefined || requiredGPUNum === 0 || (requiredGPUNum !== undefined && environment.defaultGpuSummary.gpuCount >= requiredGPUNum)); } else {
if (eligibleEnvironments.length === 0) { requiredGPUNum = defaultRequiredGPUNum;
// If the required gpu number exceeds the upper limit of all machine's GPU number }
// Return REQUIRE_EXCEED_TOTAL directly } else if (constraint.type == 'GPUNumber') {
return ({ const gpus = constraint.gpus as Array<number>;
resultType: ScheduleResultType.REQUIRE_EXCEED_TOTAL, // TODO: remove the following constraint when supporting distributed trial
if (gpus.length != 1) {
throw new Error("Placement constraint of GPUNumber must have exactly one number.");
}
requiredGPUNum = gpus[0];
}
assert(requiredGPUNum >= 0);
// Step 1: Check if required GPU number not exceeds the total GPU number in all machines
const eligibleEnvironments: EnvironmentInformation[] = environments.filter((environment: EnvironmentInformation) =>
environment.defaultGpuSummary === undefined || requiredGPUNum === 0 ||
(requiredGPUNum !== undefined && environment.defaultGpuSummary.gpuCount >= requiredGPUNum));
if (eligibleEnvironments.length === 0) {
// If the required gpu number exceeds the upper limit of all machine's GPU number
// Return REQUIRE_EXCEED_TOTAL directly
return ({
resultType: ScheduleResultType.REQUIRE_EXCEED_TOTAL,
gpuIndices: undefined,
environment: undefined,
});
}
// Step 2: Allocate Host/GPU for specified trial job
// Currenty the requireGPUNum parameter for all trial jobs are identical.
if (requiredGPUNum > 0) {
// Trial job requires GPU
const result: GpuScheduleResult | undefined = this.scheduleGPUHost(environments, requiredGPUNum, trialDetail);
if (result !== undefined) {
return result;
}
} else {
// Trail job does not need GPU
const allocatedRm: EnvironmentInformation = this.selectMachine(environments, environments);
return this.allocateHost(requiredGPUNum, allocatedRm, [], trialDetail);
}
return {
resultType: ScheduleResultType.TMP_NO_AVAILABLE_GPU,
gpuIndices: undefined, gpuIndices: undefined,
environment: undefined, environment: undefined,
}); };
} } else {
assert(constraint.type === 'Device')
if (constraint.gpus.length == 0) {
throw new Error("Device constraint is used but no device is specified.");
}
const gpus = constraint.gpus as Array<[string, number]>;
const selectedHost = gpus[0][0];
// Step 2: Allocate Host/GPU for specified trial job const hostsOfConstraint: Array<[string, number]> = gpus.filter((gpuTuple: [string, number]) => gpuTuple[0] === selectedHost);
// Currenty the requireGPUNum parameter for all trial jobs are identical. if (hostsOfConstraint.length > 1) {
if (requiredGPUNum > 0) { //TODO: remove this constraint when supporting multi-host placement
// Trial job requires GPU throw new Error("Device constraint does not support using multiple hosts")
const result: GpuScheduleResult | undefined = this.scheduleGPUHost(environments, requiredGPUNum, trialDetail);
if (result !== undefined) {
return result;
} }
} else { if (environments.length == 0) {
// Trail job does not need GPU return {
const allocatedRm: EnvironmentInformation = this.selectMachine(environments, environments); resultType: ScheduleResultType.TMP_NO_AVAILABLE_GPU,
gpuIndices: undefined,
environment: undefined,
};
}
for (const environment of environments) {
if(!('rmMachineMeta' in environment)){
//TODO: remove this constraint when supporting other training services
throw new Error(`Environment Device placement constraint only supports remote training service for now.`);
}
}
//TODO:
const eligibleEnvironments: EnvironmentInformation[] = environments.filter(
(environment: EnvironmentInformation) =>
(environment as RemoteMachineEnvironmentInformation).rmMachineMeta != undefined &&
(environment as RemoteMachineEnvironmentInformation).rmMachineMeta?.host == selectedHost);
if (eligibleEnvironments.length === 0) {
throw new Error(`The the required host (host: ${selectedHost}) is not found.`);
}
const selectedEnvironment = eligibleEnvironments[0];
const availableResources = this.gpuResourceDetection([selectedEnvironment]);
const selectedGPUs: Array<GPUInfo> = [];
return this.allocateHost(requiredGPUNum, allocatedRm, [], trialDetail); if (selectedEnvironment.defaultGpuSummary === undefined) {
} //GPU summary may not be ready, retry until it is ready
return {
resultType: ScheduleResultType.TMP_NO_AVAILABLE_GPU,
gpuIndices: undefined,
environment: undefined,
};
}
for (const gpuTuple of gpus) {
const gpuIdx: number = gpuTuple[1];
if (gpuIdx >= selectedEnvironment.defaultGpuSummary.gpuCount) {
throw new Error(`The gpuIdx of placement constraint ${gpuIdx} exceeds gpuCount of the host ${selectedHost}`);
}
return { if (availableResources.has(selectedEnvironment)) {
resultType: ScheduleResultType.TMP_NO_AVAILABLE_GPU, for (const gpuInfo of availableResources.get(selectedEnvironment)!) {
gpuIndices: undefined, if (gpuInfo.index === gpuIdx) {
environment: undefined, selectedGPUs.push(gpuInfo);
}; }
}
}
}
if (selectedGPUs.length === constraint.gpus.length) {
for (const gpuInfo of selectedGPUs) {
let num = selectedEnvironment.defaultGpuSummary?.assignedGpuIndexMap.get(gpuInfo.index);
if (num === undefined) {
num = 0;
}
selectedEnvironment.defaultGpuSummary?.assignedGpuIndexMap.set(gpuInfo.index, num + 1);
}
return {
resultType: ScheduleResultType.SUCCEED,
environment: selectedEnvironment,
gpuIndices: selectedGPUs,
};
} else {
return {
resultType: ScheduleResultType.TMP_NO_AVAILABLE_GPU,
gpuIndices: undefined,
environment: undefined,
};
}
}
} }
/** /**
...@@ -101,7 +200,7 @@ export class GpuScheduler { ...@@ -101,7 +200,7 @@ export class GpuScheduler {
trial.environment.defaultGpuSummary !== undefined && trial.environment.defaultGpuSummary !== undefined &&
trial.assignedGpus !== undefined && trial.assignedGpus !== undefined &&
trial.assignedGpus.length > 0) { trial.assignedGpus.length > 0) {
for (const gpuInfo of trial.assignedGpus) { for (const gpuInfo of trial.assignedGpus) {
const defaultGpuSummary = trial.environment.defaultGpuSummary; const defaultGpuSummary = trial.environment.defaultGpuSummary;
const num: number | undefined = defaultGpuSummary.assignedGpuIndexMap.get(gpuInfo.index); const num: number | undefined = defaultGpuSummary.assignedGpuIndexMap.get(gpuInfo.index);
...@@ -197,7 +296,7 @@ export class GpuScheduler { ...@@ -197,7 +296,7 @@ export class GpuScheduler {
throw new Error(`Unsupported schedule policy: ${this.policyName}`); throw new Error(`Unsupported schedule policy: ${this.policyName}`);
} }
} }
// Select the environment which is idle most recently. If all environments are not idle, use round robin to select an environment. // Select the environment which is idle most recently. If all environments are not idle, use round robin to select an environment.
private recentlyIdleSelect(qualifiedEnvironments: EnvironmentInformation[], allEnvironments: EnvironmentInformation[]): EnvironmentInformation { private recentlyIdleSelect(qualifiedEnvironments: EnvironmentInformation[], allEnvironments: EnvironmentInformation[]): EnvironmentInformation {
const now = Date.now(); const now = Date.now();
......
...@@ -535,8 +535,8 @@ class TrialDispatcher implements TrainingService { ...@@ -535,8 +535,8 @@ class TrialDispatcher implements TrainingService {
if (undefined === trial) { if (undefined === trial) {
throw new Error(`TrialDispatcher: waiting trial shouldn't be undefined!`); throw new Error(`TrialDispatcher: waiting trial shouldn't be undefined!`);
} }
const gpuNum = this.config.trialGpuNumber; const defaultGpuNum = this.config.trialGpuNumber;
const result = this.gpuScheduler.scheduleMachine(reusableEnvironments, gpuNum, trial); const result = this.gpuScheduler.scheduleMachine(reusableEnvironments, trial.form.placementConstraint!, defaultGpuNum, trial);
switch (result.resultType) { switch (result.resultType) {
case ScheduleResultType.REQUIRE_EXCEED_TOTAL: case ScheduleResultType.REQUIRE_EXCEED_TOTAL:
{ {
...@@ -546,7 +546,7 @@ class TrialDispatcher implements TrainingService { ...@@ -546,7 +546,7 @@ class TrialDispatcher implements TrainingService {
waitingTrials = []; waitingTrials = [];
this.isLoggedNoGpuAvailable = false; this.isLoggedNoGpuAvailable = false;
} else if (reusableEnvironments.length > 0) { } else if (reusableEnvironments.length > 0) {
const errorMessage: string = `TrialDispatcher: REQUIRE_EXCEED_TOTAL Required GPU number ${gpuNum} is too large, no machine can meet`; const errorMessage: string = `TrialDispatcher: REQUIRE_EXCEED_TOTAL Required GPU number ${defaultGpuNum} is too large, no machine can meet`;
this.log.error(errorMessage); this.log.error(errorMessage);
throw new NNIError(NNIErrorNames.RESOURCE_NOT_AVAILABLE, errorMessage); throw new NNIError(NNIErrorNames.RESOURCE_NOT_AVAILABLE, errorMessage);
} else { } else {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment