Unverified Commit a73b1d59 authored by Sourab Mangrulkar's avatar Sourab Mangrulkar Committed by GitHub
Browse files

accelerate deepspeed and gradient accumulation integrate (#23236)

* mixed precision support via accelerate

* fix issues

* fix for the sharded ddp case

* fix flax and tf failing tests

* `refactor the place to create `Accelerator` object

* move ddp prep to accelerate

* fix 😅

* resolving comments

* move fsdp handling to accelerate

* fixex

* fix saving

* shift torch dynamo handling to accelerate

* shift deepspeed integration and save & load utils to accelerate

* fix accelerate launcher support

* oops

* fix 🐛

* save ckpt fix

* Trigger CI

* nasty 🐛 😅

* as deepspeed needs grad_acc fixes, transfer grad_acc to accelerate

* make tests happy

* quality 

* loss tracked needs to account for grad_acc

* fixing the deepspeed tests

* quality 

* 😅😅😅

* tests 😡

* quality 



* Trigger CI

* resolve comments and fix the issue with the previous merge from branch

* Trigger CI

* accelerate took over deepspeed integration

---------
Co-authored-by: default avatarStas Bekman <stas@stason.org>
parent 88f50a1e
......@@ -41,7 +41,7 @@ body:
Integrations:
- deepspeed: HF Trainer: @stas00, Accelerate: @pacman100
- deepspeed: HF Trainer/Accelerate: @pacman100
- ray/raytune: @richardliaw, @amogkam
- Big Model Inference: @sgugger @muellerzr
......
......@@ -55,7 +55,7 @@ Library:
Integrations:
- deepspeed: HF Trainer: @stas00, Accelerate: @pacman100
- deepspeed: HF Trainer/Accelerate: @pacman100
- ray/raytune: @richardliaw, @amogkam
Documentation: @sgugger, @stevhliu and @MKhalusova
......
......@@ -17,7 +17,6 @@ Integration with Deepspeed
import importlib.util
import weakref
from copy import deepcopy
from functools import partialmethod
from .dependency_versions_check import dep_version_check
......@@ -256,10 +255,12 @@ def deepspeed_config():
return None
def deepspeed_optim_sched(trainer, hf_deepspeed_config, args, num_training_steps):
def deepspeed_optim_sched(trainer, hf_deepspeed_config, args, num_training_steps, model_parameters):
"""
A convenience wrapper that deals with optimizer and lr scheduler configuration.
"""
from accelerate.utils import DummyOptim, DummyScheduler
config = hf_deepspeed_config.config
# Optimizer + Scheduler
......@@ -267,13 +268,13 @@ def deepspeed_optim_sched(trainer, hf_deepspeed_config, args, num_training_steps
# 1. DS scheduler + DS optimizer: Yes
# 2. HF scheduler + HF optimizer: Yes
# 3. DS scheduler + HF optimizer: Yes
# 4. HF scheduler + DS optimizer: Yes
# 4. HF scheduler + DS optimizer: No
#
# Unless Offload is enabled in which case it's:
# 1. DS scheduler + DS optimizer: Yes
# 2. HF scheduler + HF optimizer: Mostly*
# 3. DS scheduler + HF optimizer: Mostly*
# 4. HF scheduler + DS optimizer: Yes
# 4. HF scheduler + DS optimizer: No
#
# Mostly*: All non-native DeepSpeed optimizers that have both CPU and GPU implementation should work (except LAMB)
......@@ -284,6 +285,7 @@ def deepspeed_optim_sched(trainer, hf_deepspeed_config, args, num_training_steps
"--adafactor was passed, but also found `optimizer` configured in the DeepSpeed config. "
"Only one optimizer can be configured."
)
optimizer = DummyOptim(params=model_parameters)
else:
if hf_deepspeed_config.is_offload():
logger.info(
......@@ -297,21 +299,21 @@ def deepspeed_optim_sched(trainer, hf_deepspeed_config, args, num_training_steps
# To use other optimizers requires voiding warranty with: `zero_allow_untested_optimizer`
config["zero_allow_untested_optimizer"] = True
def _lr_scheduler_callable(optimizer):
return trainer.create_scheduler(num_training_steps=num_training_steps, optimizer=optimizer)
lr_scheduler = None
if "scheduler" not in config:
if optimizer is None:
# Optimizer is not available, so use callable to defer lr_scheduler creation to DS init
lr_scheduler = _lr_scheduler_callable
else:
lr_scheduler = trainer.create_scheduler(num_training_steps=num_training_steps, optimizer=optimizer)
if "scheduler" in config:
lr_scheduler = DummyScheduler(optimizer)
else:
if isinstance(optimizer, DummyOptim):
raise ValueError(
"Found `optimizer` configured in the DeepSpeed config, but no `scheduler`. "
"Please configure a scheduler in the DeepSpeed config."
)
lr_scheduler = trainer.create_scheduler(num_training_steps=num_training_steps, optimizer=optimizer)
return optimizer, lr_scheduler
def deepspeed_init(trainer, num_training_steps, resume_from_checkpoint=None, inference=False):
def deepspeed_init(trainer, num_training_steps, inference=False):
"""
Init DeepSpeed, after updating the DeepSpeed configuration with any relevant Trainer's args.
......@@ -323,28 +325,22 @@ def deepspeed_init(trainer, num_training_steps, resume_from_checkpoint=None, inf
resume_from_checkpoint: path to a checkpoint if to resume from after normal DeepSpeedEngine load
inference: launch in inference mode (no optimizer and no lr scheduler)
Returns: model, optimizer, lr_scheduler
Returns: optimizer, lr_scheduler
We may use `deepspeed_init` more than once during the life of Trainer, when we do - it's a temp hack based on:
https://github.com/microsoft/DeepSpeed/issues/1394#issuecomment-937405374 until Deepspeed fixes a bug where it
can't resume from a checkpoint after it did some stepping https://github.com/microsoft/DeepSpeed/issues/1612
"""
import deepspeed
from deepspeed.utils import logger as ds_logger
model = trainer.model
args = trainer.args
if hasattr(trainer, "hf_deepspeed_config_orig"):
hf_deepspeed_config = deepcopy(trainer.hf_deepspeed_config_orig)
else:
hf_deepspeed_config = args.hf_deepspeed_config
trainer.hf_deepspeed_config_orig = deepcopy(args.hf_deepspeed_config)
hf_deepspeed_config = trainer.accelerator.state.deepspeed_plugin.hf_ds_config
# resume config update - some bits like `model` and `num_training_steps` only become available during train
hf_deepspeed_config.trainer_config_finalize(args, model, num_training_steps)
config = hf_deepspeed_config.config
# set the Deepspeed log level consistent with the Trainer
ds_logger.setLevel(args.get_process_log_level())
......@@ -361,40 +357,33 @@ def deepspeed_init(trainer, num_training_steps, resume_from_checkpoint=None, inf
model_parameters = None
else:
trainer.optimizer = None # important for when deepspeed_init is used as re-init
optimizer, lr_scheduler = deepspeed_optim_sched(trainer, hf_deepspeed_config, args, num_training_steps)
model_parameters = list(filter(lambda p: p.requires_grad, model.parameters()))
optimizer, lr_scheduler = deepspeed_optim_sched(
trainer, hf_deepspeed_config, args, num_training_steps, model_parameters
)
# keep for quick debug:
# from pprint import pprint; pprint(config)
kwargs = {
"model": model,
"model_parameters": model_parameters,
"config_params": config,
"optimizer": optimizer,
"lr_scheduler": lr_scheduler,
}
deepspeed_engine, optimizer, _, lr_scheduler = deepspeed.initialize(**kwargs)
if resume_from_checkpoint is not None:
# it's possible that the user is trying to resume from model_path, which doesn't necessarily
# contain a deepspeed checkpoint. e.g. examples just check if the dir exists and assume it's
# a resume from a checkpoint and not just a local pretrained weight. So we check here if the
# path contains what looks like a deepspeed checkpoint
import glob
deepspeed_checkpoint_dirs = sorted(glob.glob(f"{resume_from_checkpoint}/global_step*"))
if len(deepspeed_checkpoint_dirs) > 0:
logger.info(f"Attempting to resume from {resume_from_checkpoint}")
# this magically updates self.optimizer and self.lr_scheduler
load_path, _ = deepspeed_engine.load_checkpoint(
resume_from_checkpoint, load_optimizer_states=True, load_lr_scheduler_states=True
)
if load_path is None:
raise ValueError(f"[deepspeed] failed to resume from checkpoint {resume_from_checkpoint}")
else:
raise ValueError(f"Can't find a valid checkpoint at {resume_from_checkpoint}")
return optimizer, lr_scheduler
return deepspeed_engine, optimizer, lr_scheduler
def deepspeed_load_checkpoint(deepspeed_engine, checkpoint_path):
# it's possible that the user is trying to resume from model_path, which doesn't necessarily
# contain a deepspeed checkpoint. e.g. examples just check if the dir exists and assume it's
# a resume from a checkpoint and not just a local pretrained weight. So we check here if the
# path contains what looks like a deepspeed checkpoint
import glob
deepspeed_checkpoint_dirs = sorted(glob.glob(f"{checkpoint_path}/global_step*"))
if len(deepspeed_checkpoint_dirs) > 0:
logger.info(f"Attempting to resume from {checkpoint_path}")
# this magically updates self.optimizer and self.lr_scheduler
load_path, _ = deepspeed_engine.load_checkpoint(
checkpoint_path, load_optimizer_states=True, load_lr_scheduler_states=True
)
if load_path is None:
raise ValueError(f"[deepspeed] failed to resume from checkpoint {checkpoint_path}")
else:
raise ValueError(f"Can't find a valid checkpoint at {checkpoint_path}")
......@@ -112,6 +112,10 @@ from .utils import (
)
if is_accelerate_available():
from accelerate.state import AcceleratorState, PartialState
SMALL_MODEL_IDENTIFIER = "julien-c/bert-xsmall-dummy"
DUMMY_UNKNOWN_IDENTIFIER = "julien-c/dummy-unknown"
DUMMY_DIFF_TOKENIZER_IDENTIFIER = "julien-c/dummy-diff-tokenizer"
......@@ -1331,6 +1335,9 @@ class TestCasePlus(unittest.TestCase):
for path in self.teardown_tmp_dirs:
shutil.rmtree(path, ignore_errors=True)
self.teardown_tmp_dirs = []
if is_accelerate_available():
AcceleratorState._reset_state()
PartialState._reset_state()
def mockenv(**kwargs):
......
This diff is collapsed.
......@@ -838,7 +838,7 @@ class IterableDatasetShard(IterableDataset):
def _get_learning_rate(self):
if self.deepspeed:
if self.is_deepspeed_enabled:
# with deepspeed's fp16 and dynamic loss scale enabled the optimizer/scheduler steps may
# not run for the first few dozen steps while loss scale is too large, and thus during
# that time `get_last_lr` will fail if called during that warm up stage, so work around it:
......
......@@ -64,7 +64,7 @@ if is_torch_available():
import torch.distributed as dist
if is_accelerate_available():
from accelerate import PartialState
from accelerate.state import AcceleratorState, PartialState
from accelerate.utils import DistributedType
if is_torch_tpu_available(check_device=False):
......@@ -1550,6 +1550,7 @@ class TrainingArguments:
if isinstance(self.debug, str):
self.debug = [DebugOption(s) for s in self.debug.split()]
self.deepspeed_plugin = None
if self.deepspeed:
# - must be run very last in arg parsing, since it will use a lot of these settings.
# - must be run before the model is created.
......@@ -1562,6 +1563,12 @@ class TrainingArguments:
self.hf_deepspeed_config = HfTrainerDeepSpeedConfig(self.deepspeed)
self.hf_deepspeed_config.trainer_config_process(self)
# Accelerate DeepSpeed Plugin
from accelerate.utils import DeepSpeedPlugin
os.environ["ACCELERATE_USE_DEEPSPEED"] = "true"
self.deepspeed_plugin = DeepSpeedPlugin(hf_ds_config=self.hf_deepspeed_config)
if self.push_to_hub_token is not None:
warnings.warn(
"`--push_to_hub_token` is deprecated and will be removed in version 5 of 🤗 Transformers. Use "
......@@ -1660,6 +1667,8 @@ class TrainingArguments:
def _setup_devices(self) -> "torch.device":
requires_backends(self, ["torch"])
logger.info("PyTorch: setting up devices")
AcceleratorState._reset_state()
PartialState._reset_state()
if not is_sagemaker_mp_enabled() and not is_accelerate_available(check_partial_state=True):
raise ImportError(
"Using the `Trainer` with `PyTorch` requires `accelerate>=0.19.0`: Please run `pip install transformers[torch]` or `pip install accelerate -U`"
......
......@@ -365,16 +365,19 @@ class TrainerIntegrationDeepSpeed(TrainerIntegrationDeepSpeedWithCustomConfig, T
self.assertNotEqual(new_a, a)
def test_hf_scheduler_ds_optimizer(self):
a = 0
with mockenv_context(**self.dist_env_1_gpu):
ds_config_zero2_dict = self.get_config_dict(ZERO2)
del ds_config_zero2_dict["scheduler"] # force default HF Trainer scheduler
ds_config_zero2_dict["zero_optimization"]["offload_optimizer"]["device"] = "none"
ds_config_zero2_dict["fp16"]["initial_scale_power"] = 1 # force optimizer on the first step
trainer = get_regression_trainer(local_rank=0, fp16=True, deepspeed=ds_config_zero2_dict)
trainer.train()
new_a = trainer.model.a.item()
self.assertNotEqual(new_a, a)
with self.assertRaises(Exception) as context:
trainer.train()
self.assertIn(
"Found `optimizer` configured in the DeepSpeed config, but no `scheduler`. "
"Please configure a scheduler in the DeepSpeed config.",
str(context.exception),
)
@require_deepspeed_aio
def test_stage3_nvme_offload(self):
......@@ -751,6 +754,8 @@ class TrainerIntegrationDeepSpeed(TrainerIntegrationDeepSpeedWithCustomConfig, T
config = deepspeed_config()
self.assertTrue(bool(config), "Deepspeed config should be accessible")
# with accelerate integration below line is additionally required for this test to pass
trainer.accelerator.state._reset_state()
del trainer
# now weakref should gc the global and we shouldn't get anything here
config = deepspeed_config()
......@@ -783,8 +788,8 @@ class TrainerIntegrationDeepSpeed(TrainerIntegrationDeepSpeedWithCustomConfig, T
with mockenv_context(**self.dist_env_1_gpu):
args_dict = {
"per_gpu_train_batch_size": 1,
"per_gpu_eval_batch_size": 1,
"per_device_train_batch_size": 1,
"per_device_eval_batch_size": 1,
"gradient_accumulation_steps": 1,
"learning_rate": 1e-4,
"num_train_epochs": 1,
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment