Unverified Commit 5a0abc65 authored by Samyam Rajbhandari's avatar Samyam Rajbhandari Committed by GitHub
Browse files

Samyamr/batchconfig (#33)

* simplifying the batch config, using a single assert to test for validity and allowing for specifying only the micro batch size

* Simplifying Batch Config, Adding ability to specify batch using just micro_batch, and adding a bunch of unit tests

* ran formatting

* Typo fixes and added the config file

* reformatting

* path fixes

* removing print statements
parent 073da729
...@@ -270,60 +270,81 @@ class DeepSpeedConfig(object): ...@@ -270,60 +270,81 @@ class DeepSpeedConfig(object):
self.tensorboard_output_path = get_tensorboard_output_path(param_dict) self.tensorboard_output_path = get_tensorboard_output_path(param_dict)
self.tensorboard_job_name = get_tensorboard_job_name(param_dict) self.tensorboard_job_name = get_tensorboard_job_name(param_dict)
def _do_batch_size_sanity_check(self): def _batch_assertion(self):
assert self.train_batch_size >= self.world_size, \
'DeepSpeedConfig: {} {} is smaller than device count {}' \ train_batch = self.train_batch_size
.format(TRAIN_BATCH_SIZE, self.train_batch_size, self.world_size) micro_batch = self.train_micro_batch_size_per_gpu
grad_acc = self.gradient_accumulation_steps
assert self.train_batch_size % self.world_size == 0, \
'DeepSpeedConfig: {} {} is not divisible by device count {}' \ assert train_batch > 0, \
.format(TRAIN_BATCH_SIZE, self.train_batch_size, self.world_size) f'Train batch size: {train_batch} has to be greater than 0'
per_device_batch_size = self.train_batch_size // self.world_size assert micro_batch > 0, \
f'Micro batch size per gpu: {micro_batch} has to be greater than 0'
if self.train_micro_batch_size_per_gpu is not None:
assert self.gradient_accumulation_steps is None, \ assert grad_acc > 0, \
'DeepSpeedConfig: {} and {} should not be defined together' \ f'Gradient accumulation steps: {grad_acc} has to be greater than 0'
.format(TRAIN_MICRO_BATCH_SIZE_PER_GPU, GRADIENT_ACCUMULATION_STEPS)
assert train_batch == micro_batch * grad_acc * self.world_size, \
assert self.train_micro_batch_size_per_gpu <= self.train_batch_size, \ (f'Check batch related parameters. Train_batch_size is not equal'
'DeepSpeedConfig: {} {} is greater than {} {}' \ 'to micro_batch_per_gpu * gradient_acc_step * world_size'
.format(TRAIN_MICRO_BATCH_SIZE_PER_GPU, self.train_micro_batch_size_per_gpu, TRAIN_BATCH_SIZE, self.train_batch_size) f'{train_batch} != {micro_batch} * {grad_acc} * {self.world_size}')
assert self.train_batch_size % self.train_micro_batch_size_per_gpu == 0, \ def _set_batch_related_parameters(self):
'DeepSpeedConfig: {} {} is not divisible by {} {}' \
.format(TRAIN_BATCH_SIZE, self.train_batch_size, TRAIN_MICRO_BATCH_SIZE_PER_GPU, self.train_micro_batch_size_per_gpu) train_batch = self.train_batch_size
micro_batch = self.train_micro_batch_size_per_gpu
grad_acc = self.gradient_accumulation_steps
#all values are provided nothing needs to be set
if train_batch is not None and \
micro_batch is not None and \
grad_acc is not None:
return
#global_accumulation_steps needs to be set
elif train_batch is not None and \
micro_batch is not None:
grad_acc = train_batch // micro_batch
grad_acc = grad_acc // self.world_size
self.gradient_accumulation_steps = grad_acc
#micro_batch_per_gpu needs to be set
elif train_batch is not None and \
grad_acc is not None:
micro_batch = train_batch // self.world_size
micro_batch = micro_batch // grad_acc
self.train_micro_batch_size_per_gpu = micro_batch
#train_batch_size needs to be set
elif micro_batch is not None and \
grad_acc is not None:
train_batch_size = micro_batch * grad_acc
train_batch_size = train_batch_size * self.world_size
self.train_batch_size = train_batch_size
#gradient_accumulation_steps and micro_batch_per_gpus is set
elif train_batch is not None:
self.gradient_accumulation_steps = 1
self.train_micro_batch_size_per_gpu = train_batch // self.world_size
if per_device_batch_size > self.train_micro_batch_size_per_gpu: #train_batch_size and gradient_accumulation_step is set
assert per_device_batch_size % self.train_micro_batch_size_per_gpu == 0, \ elif micro_batch is not None:
'DeepSpeedConfig: Per device batch size {} is not divisible by {} {}' \ self.train_batch_size = micro_batch * self.world_size
.format(per_device_batch_size, TRAIN_MICRO_BATCH_SIZE_PER_GPU, self.train_micro_batch_size_per_gpu) self.gradient_accumulation_steps = 1
if self.gradient_accumulation_steps is not None: #either none of the three parameters are provided or just gradient_accumulation_step is provided
assert self.train_batch_size % self.gradient_accumulation_steps == 0, \ else:
'DeepSpeedConfig: {} {} is not divisible by {} {}' \ assert False, \
.format(TRAIN_BATCH_SIZE, self.train_batch_size, GRADIENT_ACCUMULATION_STEPS, self.gradient_accumulation_steps) 'Either train_batch_size or micro_batch_per_gpu needs to be provided'
assert per_device_batch_size % self.gradient_accumulation_steps == 0, \ print(
'DeepSpeedConfig: Per device batch size {} is not divisible by {} {}' \ f' After Train batch {self.train_batch_size} micro_batch {self.train_micro_batch_size_per_gpu} and grad_acc {self.gradient_accumulation_steps}'
.format(per_device_batch_size, GRADIENT_ACCUMULATION_STEPS, self.gradient_accumulation_steps) )
def _configure_train_batch_size(self): def _configure_train_batch_size(self):
self._do_batch_size_sanity_check() self._set_batch_related_parameters()
if self.train_micro_batch_size_per_gpu is None and \ self._batch_assertion()
self.gradient_accumulation_steps is None:
self.train_micro_batch_size_per_gpu = self.train_batch_size
self.gradient_accumulation_steps = 1
elif self.train_micro_batch_size_per_gpu is not None:
per_device_batch_size = self.train_batch_size // self.world_size
if self.train_micro_batch_size_per_gpu > per_device_batch_size:
self.train_micro_batch_size_per_gpu = per_device_batch_size
self.gradient_accumulation_steps = 1
else:
self.gradient_accumulation_steps = per_device_batch_size // self.train_micro_batch_size_per_gpu
else:
self.train_micro_batch_size_per_gpu = self.train_batch_size // (
self.gradient_accumulation_steps * self.world_size)
def _do_sanity_check(self): def _do_sanity_check(self):
self._do_error_check() self._do_error_check()
......
...@@ -92,8 +92,7 @@ gpt_options=" \ ...@@ -92,8 +92,7 @@ gpt_options=" \
" "
work_dir="../../../DeepSpeedExamples/Megatron-LM/" work_dir="../../../DeepSpeedExamples/Megatron-LM/"
include_str=`seq 0 $(( $gpus - 1 )) | paste -sd "," -` run_cmd="(cd ${work_dir} && deepspeed --num_gpus $gpus pretrain_gpt2.py ${gpt_options})"
run_cmd="(cd ${work_dir} && deepspeed -i localhost:${include_str} pretrain_gpt2.py ${gpt_options})"
echo ${run_cmd} echo ${run_cmd}
eval ${run_cmd} eval ${run_cmd}
......
{
"train_batch_size": 2,
"gradient_accumulation_steps": 1,
"optimizer": {
"type": "Adam",
"params": {
"lr": 0.00015,
"max_grad_norm": 1.0
}
},
"fp16": {
"enabled": true,
"loss_scale": 0
}
}
# A test on its own # A test on its own
import torch import torch
import pytest
from common import distributed_test
import torch.distributed as dist
# A test on its own # A test on its own
import deepspeed import deepspeed
from deepspeed.pt.deepspeed_config import DeepSpeedConfig
def test_cuda(): def test_cuda():
...@@ -13,3 +17,83 @@ def test_check_version(): ...@@ -13,3 +17,83 @@ def test_check_version():
assert hasattr(deepspeed, "__git_hash__") assert hasattr(deepspeed, "__git_hash__")
assert hasattr(deepspeed, "__git_branch__") assert hasattr(deepspeed, "__git_branch__")
assert hasattr(deepspeed, "__version__") assert hasattr(deepspeed, "__version__")
def _run_batch_config(ds_config, train_batch=None, micro_batch=None, gas=None):
ds_config.train_batch_size = train_batch
ds_config.train_micro_batch_size_per_gpu = micro_batch
ds_config.gradient_accumulation_steps = gas
success = True
try:
ds_config._configure_train_batch_size()
except AssertionError:
success = False
return success
def _batch_assert(status, ds_config, batch, micro_batch, gas, success):
if not success:
assert not status
print("Failed but All is well")
return
assert ds_config.train_batch_size == batch
assert ds_config.train_micro_batch_size_per_gpu == micro_batch
assert ds_config.gradient_accumulation_steps == gas
print("All is well")
#Tests different batch config provided in deepspeed json file
@pytest.mark.parametrize('num_ranks,batch,micro_batch,gas,success',
[(2,32,16,1,True),
(2,32,8,2,True),
(2,33,17,2,False),
(2,32,18,1,False)]) # yapf: disable
def test_batch_config(num_ranks, batch, micro_batch, gas, success):
@distributed_test(world_size=2)
def _test_batch_config(num_ranks, batch, micro_batch, gas, success):
assert dist.get_world_size() == num_ranks, \
'The test assumes a world size of f{num_ranks}'
ds_batch_config = 'tests/unit/ds_batch_config.json'
ds_config = DeepSpeedConfig(ds_batch_config)
#test cases when all parameters are provided
status = _run_batch_config(ds_config,
train_batch=batch,
micro_batch=micro_batch,
gas=gas)
_batch_assert(status, ds_config, batch, micro_batch, gas, success)
#test cases when two out of three parameters are provided
status = _run_batch_config(ds_config, train_batch=batch, micro_batch=micro_batch)
_batch_assert(status, ds_config, batch, micro_batch, gas, success)
if success:
#when gas is provided with one more parameter
status = _run_batch_config(ds_config, train_batch=batch, gas=gas)
_batch_assert(status, ds_config, batch, micro_batch, gas, success)
status = _run_batch_config(ds_config, micro_batch=micro_batch, gas=gas)
_batch_assert(status, ds_config, batch, micro_batch, gas, success)
#test the case when only micro_batch or train_batch is provided
if gas == 1:
status = _run_batch_config(ds_config, micro_batch=micro_batch)
_batch_assert(status, ds_config, batch, micro_batch, gas, success)
status = _run_batch_config(ds_config, train_batch=batch)
_batch_assert(status, ds_config, batch, micro_batch, gas, success)
else:
#when only gas is provided
status = _run_batch_config(ds_config, gas=gas)
_batch_assert(status, ds_config, batch, micro_batch, gas, success)
#when gas is provided with something else and gas does not divide batch
if gas != 1:
status = _run_batch_config(ds_config, train_batch=batch, gas=gas)
_batch_assert(status, ds_config, batch, micro_batch, gas, success)
"""Run batch config test """
_test_batch_config(num_ranks, batch, micro_batch, gas, success)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment