Unverified Commit e1ad8803 authored by Chunyang Wen's avatar Chunyang Wen Committed by GitHub
Browse files

Add log util (#230)

* Add log util

* replace all occurrences of print and logging

* address format

* disable propagate to avoid duplicate log
parent 2a1c5db1
...@@ -5,6 +5,7 @@ Copyright 2020 The Microsoft DeepSpeed Team ...@@ -5,6 +5,7 @@ Copyright 2020 The Microsoft DeepSpeed Team
from deepspeed.pt.deepspeed_light import DeepSpeedLight from deepspeed.pt.deepspeed_light import DeepSpeedLight
from deepspeed.pt.deepspeed_light import ADAM_OPTIMIZER, LAMB_OPTIMIZER from deepspeed.pt.deepspeed_light import ADAM_OPTIMIZER, LAMB_OPTIMIZER
from deepspeed.pt.deepspeed_lr_schedules import add_tuning_arguments from deepspeed.pt.deepspeed_lr_schedules import add_tuning_arguments
from deepspeed.pt.log_utils import logger
from deepspeed.pt.deepspeed_cuda import DeepSpeedTransformerLayer, DeepSpeedTransformerConfig from deepspeed.pt.deepspeed_cuda import DeepSpeedTransformerLayer, DeepSpeedTransformerConfig
from deepspeed.pt.deepspeed_config import DeepSpeedConfig from deepspeed.pt.deepspeed_config import DeepSpeedConfig
...@@ -82,11 +83,12 @@ def initialize(args, ...@@ -82,11 +83,12 @@ def initialize(args,
* ``lr_scheduler``: Wrapped lr scheduler if user ``lr_scheduler`` is passed, or * ``lr_scheduler``: Wrapped lr scheduler if user ``lr_scheduler`` is passed, or
if ``lr_scheduler`` specified in JSON configuration. Otherwise ``None``. if ``lr_scheduler`` specified in JSON configuration. Otherwise ``None``.
""" """
print("DeepSpeed info: version={}, git-hash={}, git-branch={}".format( logger.info(
__version__, "DeepSpeed info: version={}, git-hash={}, git-branch={}".format(
__git_hash__, __version__,
__git_branch__), __git_hash__,
flush=True) __git_branch__),
)
engine = DeepSpeedLight(args=args, engine = DeepSpeedLight(args=args,
model=model, model=model,
......
...@@ -22,6 +22,7 @@ from torch.cuda import _lazy_call, device as device_ctx_manager ...@@ -22,6 +22,7 @@ from torch.cuda import _lazy_call, device as device_ctx_manager
from deepspeed.pt.deepspeed_timer import SynchronizedWallClockTimer as Timers from deepspeed.pt.deepspeed_timer import SynchronizedWallClockTimer as Timers
import torch.distributed as dist import torch.distributed as dist
from deepspeed.pt.deepspeed_config import DeepSpeedConfig from deepspeed.pt.deepspeed_config import DeepSpeedConfig
from deepspeed.pt.log_utils import logger
#DeepSpeed Checkpointing Enabled or Disabled #DeepSpeed Checkpointing Enabled or Disabled
deepspeed_checkpointing_enabled = False deepspeed_checkpointing_enabled = False
...@@ -58,20 +59,23 @@ def see_memory_usage(message, force=False): ...@@ -58,20 +59,23 @@ def see_memory_usage(message, force=False):
return return
#dist.barrier() #dist.barrier()
if dist.get_rank() == 0: if dist.get_rank() == 0:
print(message) logger.info(message)
print("Memory Allocated ", logger.info(
torch.cuda.memory_allocated() / (1024 * 1024 * 1024), "Memory Allocated %s GigaBytes",
"GigaBytes") torch.cuda.memory_allocated() / (1024 * 1024 * 1024),
print("Max Memory Allocated ", )
torch.cuda.max_memory_allocated() / (1024 * 1024 * 1024), logger.info(
"GigaBytes") "Max Memory Allocated %s GigaBytes",
print("Cache Allocated ", torch.cuda.max_memory_allocated() / (1024 * 1024 * 1024),
torch.cuda.memory_cached() / (1024 * 1024 * 1024), )
"GigaBytes") logger.info(
print("Max cache Allocated ", "Cache Allocated %s GigaBytes",
torch.cuda.max_memory_cached() / (1024 * 1024 * 1024), torch.cuda.memory_cached() / (1024 * 1024 * 1024),
"GigaBytes") )
print(" ") logger.info(
"Max cache Allocated %s GigaBytes",
torch.cuda.max_memory_cached() / (1024 * 1024 * 1024),
)
#input("Press Any Key To Continue ..") #input("Press Any Key To Continue ..")
...@@ -240,15 +244,16 @@ def model_parallel_cuda_manual_seed(seed): ...@@ -240,15 +244,16 @@ def model_parallel_cuda_manual_seed(seed):
data_parallel_seed = seed data_parallel_seed = seed
if torch.distributed.get_rank() == 0: if torch.distributed.get_rank() == 0:
print('> initializing model parallel cuda seeds on global rank {}, ' logger.info(
'model parallel rank {}, and data parallel rank {} with ' '> initializing model parallel cuda seeds on global rank {}, '
'model parallel seed: {} and data parallel seed: {}'.format( 'model parallel rank {}, and data parallel rank {} with '
torch.distributed.get_rank(), 'model parallel seed: {} and data parallel seed: {}'.format(
mpu.get_model_parallel_rank(), torch.distributed.get_rank(),
mpu.get_data_parallel_rank(), mpu.get_model_parallel_rank(),
model_parallel_seed, mpu.get_data_parallel_rank(),
data_parallel_seed), model_parallel_seed,
flush=True) data_parallel_seed),
)
_CUDA_RNG_STATE_TRACKER.reset() _CUDA_RNG_STATE_TRACKER.reset()
# Set the default state. # Set the default state.
torch.cuda.manual_seed(data_parallel_seed) torch.cuda.manual_seed(data_parallel_seed)
...@@ -348,15 +353,15 @@ class CheckpointFunction(torch.autograd.Function): ...@@ -348,15 +353,15 @@ class CheckpointFunction(torch.autograd.Function):
if cuda_device is None: if cuda_device is None:
see_memory_usage("First Forward Begining", force=True) see_memory_usage("First Forward Begining", force=True)
if dist.get_rank() == 0: if dist.get_rank() == 0:
print(f"Activation Checkpointing Information") logger.info(f"Activation Checkpointing Information")
print( logger.info(
f"----Partition Activations {PARTITION_ACTIVATIONS}, CPU CHECKPOINTING {PA_TO_CPU}" f"----Partition Activations {PARTITION_ACTIVATIONS}, CPU CHECKPOINTING {PA_TO_CPU}"
) )
print( logger.info(
f"----contiguous Memory Checkpointing {CONTIGUOUS_CHECKPOINTING} with {num_layers} total layers" f"----contiguous Memory Checkpointing {CONTIGUOUS_CHECKPOINTING} with {num_layers} total layers"
) )
print(f"----Synchronization {SYNCHRONIZE}") logger.info(f"----Synchronization {SYNCHRONIZE}")
print(f"----Profiling {PROFILE_TIME}") logger.info(f"----Profiling {PROFILE_TIME}")
cuda_device = torch.cuda.current_device() cuda_device = torch.cuda.current_device()
transport_stream = torch.cuda.Stream(device=cuda_device) transport_stream = torch.cuda.Stream(device=cuda_device)
...@@ -464,7 +469,7 @@ class CheckpointFunction(torch.autograd.Function): ...@@ -464,7 +469,7 @@ class CheckpointFunction(torch.autograd.Function):
else: else:
new_args.append(size) new_args.append(size)
#if dist.get_rank() == 0: #if dist.get_rank() == 0:
# print (f"The stored tensor is {contiguous_size} and orginal one is {size} ") # logger.info(f"The stored tensor is {contiguous_size} and orginal one is {size} ")
ctx.save_for_backward(*new_args) ctx.save_for_backward(*new_args)
else: else:
...@@ -562,7 +567,8 @@ def partition_activations_in_checkpoint(partition_activation): ...@@ -562,7 +567,8 @@ def partition_activations_in_checkpoint(partition_activation):
global PARTITION_ACTIVATIONS global PARTITION_ACTIVATIONS
PARTITION_ACTIVATIONS = partition_activation PARTITION_ACTIVATIONS = partition_activation
if dist.get_rank() == 0: if dist.get_rank() == 0:
print(f"**************Partition Activations {PARTITION_ACTIVATIONS}************") logger.info(
f"**************Partition Activations {PARTITION_ACTIVATIONS}************")
def set_num_layers(nlayers): def set_num_layers(nlayers):
...@@ -601,7 +607,7 @@ def _configure_using_config_file(deepspeed_config): ...@@ -601,7 +607,7 @@ def _configure_using_config_file(deepspeed_config):
PA_TO_CPU, SYNCHRONIZE, PROFILE_TIME PA_TO_CPU, SYNCHRONIZE, PROFILE_TIME
config = DeepSpeedConfig(deepspeed_config).activation_checkpointing_config config = DeepSpeedConfig(deepspeed_config).activation_checkpointing_config
print(config.repr()) logger.info(config.repr())
PARTITION_ACTIVATIONS = config.partition_activations PARTITION_ACTIVATIONS = config.partition_activations
CONTIGUOUS_CHECKPOINTING = config.contiguous_memory_optimization CONTIGUOUS_CHECKPOINTING = config.contiguous_memory_optimization
num_layers = config.number_checkpoints num_layers = config.number_checkpoints
......
...@@ -4,13 +4,13 @@ Licensed under the MIT license. ...@@ -4,13 +4,13 @@ Licensed under the MIT license.
""" """
import torch import torch
import logging
import json import json
from deepspeed.pt.deepspeed_constants import * from deepspeed.pt.deepspeed_constants import *
from deepspeed.pt.loss_scaler import INITIAL_LOSS_SCALE, SCALE_WINDOW, DELAYED_SHIFT, MIN_LOSS_SCALE from deepspeed.pt.loss_scaler import INITIAL_LOSS_SCALE, SCALE_WINDOW, DELAYED_SHIFT, MIN_LOSS_SCALE
from deepspeed.pt.deepspeed_config_utils import get_scalar_param, dict_raise_error_on_duplicate_keys from deepspeed.pt.deepspeed_config_utils import get_scalar_param, dict_raise_error_on_duplicate_keys
from deepspeed.pt.deepspeed_zero_config import DeepSpeedZeroConfig from deepspeed.pt.deepspeed_zero_config import DeepSpeedZeroConfig
from deepspeed.pt.deepspeed_checkpointing_config import DeepSpeedActivationCheckpointingConfig from deepspeed.pt.deepspeed_checkpointing_config import DeepSpeedActivationCheckpointingConfig
from deepspeed.pt.log_utils import logger
TENSOR_CORE_ALIGN_SIZE = 8 TENSOR_CORE_ALIGN_SIZE = 8
ADAM_OPTIMIZER = 'adam' ADAM_OPTIMIZER = 'adam'
...@@ -407,7 +407,7 @@ class DeepSpeedConfig(object): ...@@ -407,7 +407,7 @@ class DeepSpeedConfig(object):
assert False, \ assert False, \
'Either train_batch_size or micro_batch_per_gpu needs to be provided' 'Either train_batch_size or micro_batch_per_gpu needs to be provided'
print( logger.info(
f' After Train batch {self.train_batch_size} micro_batch {self.train_micro_batch_size_per_gpu} and grad_acc {self.gradient_accumulation_steps}' f' After Train batch {self.train_batch_size} micro_batch {self.train_micro_batch_size_per_gpu} and grad_acc {self.gradient_accumulation_steps}'
) )
...@@ -421,13 +421,13 @@ class DeepSpeedConfig(object): ...@@ -421,13 +421,13 @@ class DeepSpeedConfig(object):
self._do_warning_check() self._do_warning_check()
def print(self, name): def print(self, name):
print('{}:'.format(name), flush=True) logger.info('{}:'.format(name))
for arg in sorted(vars(self)): for arg in sorted(vars(self)):
if arg != '_param_dict': if arg != '_param_dict':
dots = '.' * (29 - len(arg)) dots = '.' * (29 - len(arg))
print(' {} {} {}'.format(arg, dots, getattr(self, arg)), flush=True) logger.info(' {} {} {}'.format(arg, dots, getattr(self, arg)))
print(' json = {}'.format( logger.info(' json = {}'.format(
json.dumps(self._param_dict, json.dumps(self._param_dict,
sort_keys=True, sort_keys=True,
indent=4, indent=4,
...@@ -449,7 +449,7 @@ class DeepSpeedConfig(object): ...@@ -449,7 +449,7 @@ class DeepSpeedConfig(object):
vocabulary_size = self._param_dict.get(VOCABULARY_SIZE, VOCABULARY_SIZE_DEFAULT) vocabulary_size = self._param_dict.get(VOCABULARY_SIZE, VOCABULARY_SIZE_DEFAULT)
if vocabulary_size and vocabulary_size % TENSOR_CORE_ALIGN_SIZE != 0: if vocabulary_size and vocabulary_size % TENSOR_CORE_ALIGN_SIZE != 0:
logging.warning( logger.warning(
"DeepSpeedConfig: vocabulary size {} is not aligned to {}, may import tensor core utilization." "DeepSpeedConfig: vocabulary size {} is not aligned to {}, may import tensor core utilization."
.format(vocabulary_size, .format(vocabulary_size,
TENSOR_CORE_ALIGN_SIZE)) TENSOR_CORE_ALIGN_SIZE))
...@@ -458,12 +458,12 @@ class DeepSpeedConfig(object): ...@@ -458,12 +458,12 @@ class DeepSpeedConfig(object):
MAX_GRAD_NORM in self.optimizer_params.keys() and \ MAX_GRAD_NORM in self.optimizer_params.keys() and \
self.optimizer_params[MAX_GRAD_NORM] > 0: self.optimizer_params[MAX_GRAD_NORM] > 0:
if fp16_enabled: if fp16_enabled:
logging.warning( logger.warning(
'DeepSpeedConfig: In FP16 mode, DeepSpeed will pass {}:{} to FP16 wrapper' 'DeepSpeedConfig: In FP16 mode, DeepSpeed will pass {}:{} to FP16 wrapper'
.format(MAX_GRAD_NORM, .format(MAX_GRAD_NORM,
self.optimizer_params[MAX_GRAD_NORM])) self.optimizer_params[MAX_GRAD_NORM]))
else: else:
logging.warning( logger.warning(
'DeepSpeedConfig: In FP32 mode, DeepSpeed does not permit MAX_GRAD_NORM ({}) > 0, setting to zero' 'DeepSpeedConfig: In FP32 mode, DeepSpeed does not permit MAX_GRAD_NORM ({}) > 0, setting to zero'
.format(self.optimizer_params[MAX_GRAD_NORM])) .format(self.optimizer_params[MAX_GRAD_NORM]))
self.optimizer_params[MAX_GRAD_NORM] = 0.0 self.optimizer_params[MAX_GRAD_NORM] = 0.0
...@@ -10,6 +10,8 @@ import base64 ...@@ -10,6 +10,8 @@ import base64
from collections import defaultdict from collections import defaultdict
from argparse import ArgumentParser, REMAINDER from argparse import ArgumentParser, REMAINDER
from deepspeed.pt.log_utils import logger
def parse_args(): def parse_args():
parser = ArgumentParser(description="DeepSpeed distributed training launch" parser = ArgumentParser(description="DeepSpeed distributed training launch"
...@@ -59,24 +61,24 @@ def main(): ...@@ -59,24 +61,24 @@ def main():
for k in current_env.keys(): for k in current_env.keys():
if "NCCL" in k: if "NCCL" in k:
print(args.node_rank, k, current_env[k], flush=True) logger.info("%s %s %s", args.node_rank, k, current_env[k])
world_info = None world_info = None
assert args.world_info != "None", "must provide world info dict" assert args.world_info != "None", "must provide world info dict"
world_info = base64.urlsafe_b64decode(args.world_info) world_info = base64.urlsafe_b64decode(args.world_info)
world_info = json.loads(world_info) world_info = json.loads(world_info)
print("WORLD INFO DICT: {}".format(world_info), flush=True) logger.info("WORLD INFO DICT: {}".format(world_info))
node_list = list(world_info.keys()) node_list = list(world_info.keys())
args.nnodes = len(node_list) args.nnodes = len(node_list)
local_node = node_list[args.node_rank] local_node = node_list[args.node_rank]
local_gpu_ids = world_info[local_node] local_gpu_ids = world_info[local_node]
num_local_procs = len(local_gpu_ids) num_local_procs = len(local_gpu_ids)
print("nnodes={}, num_local_procs={}, node_rank={}".format( logger.info(
args.nnodes, "nnodes={}, num_local_procs={}, node_rank={}".format(args.nnodes,
num_local_procs, num_local_procs,
args.node_rank), args.node_rank),
flush=True) )
global_rank_mapping = defaultdict(list) global_rank_mapping = defaultdict(list)
curr_global_rank = 0 curr_global_rank = 0
...@@ -87,11 +89,11 @@ def main(): ...@@ -87,11 +89,11 @@ def main():
for gid in gids: for gid in gids:
global_rank_mapping[node_id].append(curr_global_rank) global_rank_mapping[node_id].append(curr_global_rank)
curr_global_rank += 1 curr_global_rank += 1
print("global_rank_mapping={}".format(global_rank_mapping), flush=True) logger.info("global_rank_mapping={}".format(global_rank_mapping))
print("dist_world_size={}".format(dist_world_size), flush=True) logger.info("dist_world_size={}".format(dist_world_size))
current_env["CUDA_VISIBLE_DEVICES"] = ",".join(map(str, local_gpu_ids)) current_env["CUDA_VISIBLE_DEVICES"] = ",".join(map(str, local_gpu_ids))
print("Setting CUDA_VISIBLE_DEVICES={}".format(current_env["CUDA_VISIBLE_DEVICES"]), logger.info("Setting CUDA_VISIBLE_DEVICES={}".format(
flush=True) current_env["CUDA_VISIBLE_DEVICES"]))
exclusion_counts_per_node = None exclusion_counts_per_node = None
# set PyTorch distributed related environmental variables # set PyTorch distributed related environmental variables
......
...@@ -2,7 +2,6 @@ ...@@ -2,7 +2,6 @@
Copyright 2019 The Microsoft DeepSpeed Team Copyright 2019 The Microsoft DeepSpeed Team
''' '''
import logging
import torch import torch
import os import os
import warnings import warnings
...@@ -15,6 +14,7 @@ from tensorboardX import SummaryWriter ...@@ -15,6 +14,7 @@ from tensorboardX import SummaryWriter
from deepspeed.pt.deepspeed_timer import ThroughputTimer, SynchronizedWallClockTimer from deepspeed.pt.deepspeed_timer import ThroughputTimer, SynchronizedWallClockTimer
from deepspeed.pt.deepspeed_zero_optimizer import FP16_DeepSpeedZeroOptimizer from deepspeed.pt.deepspeed_zero_optimizer import FP16_DeepSpeedZeroOptimizer
from deepspeed.pt.zero_optimizer_stage1 import FP16_DeepSpeedZeroOptimizer_Stage1 from deepspeed.pt.zero_optimizer_stage1 import FP16_DeepSpeedZeroOptimizer_Stage1
from deepspeed.pt.log_utils import logger
import deepspeed.pt.deepspeed_checkpointing as deepspeed_activation_checkpointing import deepspeed.pt.deepspeed_checkpointing as deepspeed_activation_checkpointing
from deepspeed.pt.fp16_optimizer import FP16_Optimizer from deepspeed.pt.fp16_optimizer import FP16_Optimizer
...@@ -42,7 +42,7 @@ except ImportError: ...@@ -42,7 +42,7 @@ except ImportError:
try: try:
_ = warned_flatten _ = warned_flatten
except NameError: except NameError:
print( logger.warning(
"Warning: apex was installed without --cpp_ext. Falling back to Python flatten and unflatten." "Warning: apex was installed without --cpp_ext. Falling back to Python flatten and unflatten."
) )
warned_flatten = True warned_flatten = True
...@@ -69,7 +69,9 @@ def _initialize_parameter_parallel_groups(parameter_parallel_size=None): ...@@ -69,7 +69,9 @@ def _initialize_parameter_parallel_groups(parameter_parallel_size=None):
data_parallel_size = int(dist.get_world_size()) data_parallel_size = int(dist.get_world_size())
if parameter_parallel_size is None: if parameter_parallel_size is None:
parameter_parallel_size = int(data_parallel_size) parameter_parallel_size = int(data_parallel_size)
print(data_parallel_size, parameter_parallel_size) logger.info("data_parallel_size: %s, parameter_parallel_size: %s",
data_parallel_size,
parameter_parallel_size)
assert data_parallel_size % parameter_parallel_size == 0, \ assert data_parallel_size % parameter_parallel_size == 0, \
'world size should be divisible by parameter parallel size' 'world size should be divisible by parameter parallel size'
rank = dist.get_rank() rank = dist.get_rank()
...@@ -83,10 +85,10 @@ def _initialize_parameter_parallel_groups(parameter_parallel_size=None): ...@@ -83,10 +85,10 @@ def _initialize_parameter_parallel_groups(parameter_parallel_size=None):
def print_configuration(args, name): def print_configuration(args, name):
print('{}:'.format(name), flush=True) logger.info('{}:'.format(name))
for arg in sorted(vars(args)): for arg in sorted(vars(args)):
dots = '.' * (29 - len(arg)) dots = '.' * (29 - len(arg))
print(' {} {} {}'.format(arg, dots, getattr(args, arg)), flush=True) logger.info(' {} {} {}'.format(arg, dots, getattr(args, arg)))
class DeepSpeedLight(Module): class DeepSpeedLight(Module):
...@@ -105,10 +107,6 @@ class DeepSpeedLight(Module): ...@@ -105,10 +107,6 @@ class DeepSpeedLight(Module):
config_params=None): config_params=None):
super(DeepSpeedLight, self).__init__() super(DeepSpeedLight, self).__init__()
logging.basicConfig(level=logging.INFO,
format="[%(levelname)s %(asctime)s] %(message)s",
datefmt="%Y-%m-%d %H:%M:%S")
self.client_optimizer = optimizer self.client_optimizer = optimizer
self.client_model_parameters = model_parameters self.client_model_parameters = model_parameters
self.client_lr_scheduler = lr_scheduler self.client_lr_scheduler = lr_scheduler
...@@ -131,11 +129,11 @@ class DeepSpeedLight(Module): ...@@ -131,11 +129,11 @@ class DeepSpeedLight(Module):
self.dist_backend = "nccl" self.dist_backend = "nccl"
if dist_init_required: if dist_init_required:
if not dist.is_initialized(): if not dist.is_initialized():
logging.info("Initializing torch distributed with backend: {}".format( logger.info("Initializing torch distributed with backend: {}".format(
self.dist_backend)) self.dist_backend))
dist.init_process_group(backend=self.dist_backend) dist.init_process_group(backend=self.dist_backend)
else: else:
logging.warning( logger.warning(
"Was given dist_init_required=True but detected that torch" "Was given dist_init_required=True but detected that torch"
"distributed was already initialized, cannot initialize twice.") "distributed was already initialized, cannot initialize twice.")
...@@ -178,8 +176,8 @@ class DeepSpeedLight(Module): ...@@ -178,8 +176,8 @@ class DeepSpeedLight(Module):
for name, module in self.module.named_modules(): for name, module in self.module.named_modules():
if isinstance(module, torch.nn.Embedding): if isinstance(module, torch.nn.Embedding):
self.csr_tensor_module_names.add(name + ".weight") self.csr_tensor_module_names.add(name + ".weight")
logging.info("Will convert {} to sparse (csr) " logger.info("Will convert {} to sparse (csr) "
"tensor during training".format(name)) "tensor during training".format(name))
self.save_non_zero_checkpoint = False self.save_non_zero_checkpoint = False
self.save_zero_checkpoint = False self.save_zero_checkpoint = False
...@@ -216,7 +214,7 @@ class DeepSpeedLight(Module): ...@@ -216,7 +214,7 @@ class DeepSpeedLight(Module):
os.environ['MASTER_ADDR'] = master_addr os.environ['MASTER_ADDR'] = master_addr
os.environ['MASTER_PORT'] = TORCH_DISTRIBUTED_DEFAULT_PORT os.environ['MASTER_PORT'] = TORCH_DISTRIBUTED_DEFAULT_PORT
logging.info( logger.info(
"Discovered MPI settings of world_rank={}, local_rank={}, world_size={}, master_addr={}, master_port={}" "Discovered MPI settings of world_rank={}, local_rank={}, world_size={}, master_addr={}, master_port={}"
.format(os.environ['RANK'], .format(os.environ['RANK'],
args.local_rank, args.local_rank,
...@@ -354,13 +352,13 @@ class DeepSpeedLight(Module): ...@@ -354,13 +352,13 @@ class DeepSpeedLight(Module):
# First check for scheduler in json configuration # First check for scheduler in json configuration
lr_scheduler = self._scheduler_from_config(self.optimizer) lr_scheduler = self._scheduler_from_config(self.optimizer)
if lr_scheduler: if lr_scheduler:
logging.info( logger.info(
f'DeepSpeed using configured LR scheduler = {self.scheduler_name()}') f'DeepSpeed using configured LR scheduler = {self.scheduler_name()}')
self.lr_scheduler = lr_scheduler self.lr_scheduler = lr_scheduler
else: else:
logging.warning('DeepSpeed using client LR scheduler') logger.warning('DeepSpeed using client LR scheduler')
self.lr_scheduler = client_lr_scheduler self.lr_scheduler = client_lr_scheduler
logging.info(f'DeepSpeed LR Scheduler = {self.lr_scheduler}') logger.info(f'DeepSpeed LR Scheduler = {self.lr_scheduler}')
def _configure_checkpointing(self, dist_init_required): def _configure_checkpointing(self, dist_init_required):
...@@ -401,7 +399,7 @@ class DeepSpeedLight(Module): ...@@ -401,7 +399,7 @@ class DeepSpeedLight(Module):
self.device = torch.device("cuda", self.local_rank) self.device = torch.device("cuda", self.local_rank)
self.world_size = dist.get_world_size() self.world_size = dist.get_world_size()
self.global_rank = dist.get_rank() self.global_rank = dist.get_rank()
logging.info("Set device to local rank {} within node.".format( logger.info("Set device to local rank {} within node.".format(
self.local_rank)) self.local_rank))
else: else:
self.world_size = 1 self.world_size = 1
...@@ -418,7 +416,7 @@ class DeepSpeedLight(Module): ...@@ -418,7 +416,7 @@ class DeepSpeedLight(Module):
# Validate command line arguments # Validate command line arguments
def _do_args_sanity_check(self, args): def _do_args_sanity_check(self, args):
if hasattr(args, 'deepscale_config') and args.deepscale_config is not None: if hasattr(args, 'deepscale_config') and args.deepscale_config is not None:
logging.warning( logger.warning(
"************ --deepscale_config is deprecated, please use --deepspeed_config ************" "************ --deepscale_config is deprecated, please use --deepspeed_config ************"
) )
if hasattr(args, 'deepspeed_config'): if hasattr(args, 'deepspeed_config'):
...@@ -464,7 +462,7 @@ class DeepSpeedLight(Module): ...@@ -464,7 +462,7 @@ class DeepSpeedLight(Module):
self.data_parallel_group = self.mpu.get_data_parallel_group() self.data_parallel_group = self.mpu.get_data_parallel_group()
self.dp_world_size = self.mpu.get_data_parallel_world_size() self.dp_world_size = self.mpu.get_data_parallel_world_size()
src_rank = _get_global_rank(self.mpu.get_data_parallel_group(), 0) src_rank = _get_global_rank(self.mpu.get_data_parallel_group(), 0)
print(f"global src_rank={src_rank}") logger.info(f"global src_rank={src_rank}")
for p in self.module.parameters(): for p in self.module.parameters():
if torch.is_tensor(p): if torch.is_tensor(p):
dist.broadcast(p, src_rank, group=self.data_parallel_group) dist.broadcast(p, src_rank, group=self.data_parallel_group)
...@@ -478,21 +476,21 @@ class DeepSpeedLight(Module): ...@@ -478,21 +476,21 @@ class DeepSpeedLight(Module):
def _configure_optimizer(self, client_optimizer, model_parameters): def _configure_optimizer(self, client_optimizer, model_parameters):
if client_optimizer is not None: if client_optimizer is not None:
basic_optimizer = client_optimizer basic_optimizer = client_optimizer
logging.info('Using client Optimizer as basic optimizer') logger.info('Using client Optimizer as basic optimizer')
else: else:
basic_optimizer = self._configure_basic_optimizer(model_parameters) basic_optimizer = self._configure_basic_optimizer(model_parameters)
logging.info( logger.info(
'Using DeepSpeed Optimizer param name {} as basic optimizer'.format( 'Using DeepSpeed Optimizer param name {} as basic optimizer'.format(
self.optimizer_name())) self.optimizer_name()))
logging.info('DeepSpeed Basic Optimizer = {}'.format(basic_optimizer)) logger.info('DeepSpeed Basic Optimizer = {}'.format(basic_optimizer))
if self.zero_optimization(): if self.zero_optimization():
if self.optimizer_name() != ADAM_OPTIMIZER: if self.optimizer_name() != ADAM_OPTIMIZER:
assert self.zero_allow_untested_optimizer(), \ assert self.zero_allow_untested_optimizer(), \
'You are using an untested ZeRO Optimizer. Please add <"zero_allow_untested_optimizer": true> in the configuration file to use it.' 'You are using an untested ZeRO Optimizer. Please add <"zero_allow_untested_optimizer": true> in the configuration file to use it.'
logging.warning( logger.warning(
"**** You are using ZeRO with an untested optimizer, proceed with caution *****" "**** You are using ZeRO with an untested optimizer, proceed with caution *****"
) )
self.optimizer = self._configure_zero_optimizer(basic_optimizer) self.optimizer = self._configure_zero_optimizer(basic_optimizer)
...@@ -501,7 +499,7 @@ class DeepSpeedLight(Module): ...@@ -501,7 +499,7 @@ class DeepSpeedLight(Module):
else: else:
self.optimizer = basic_optimizer self.optimizer = basic_optimizer
# logging.info('DeepSpeed Final Optimizer = {}'.format(self.optimizer.state_dict())) # logger.info('DeepSpeed Final Optimizer = {}'.format(self.optimizer.state_dict()))
def _configure_basic_optimizer(self, model_parameters): def _configure_basic_optimizer(self, model_parameters):
optimizer_parameters = self.optimizer_params() optimizer_parameters = self.optimizer_params()
...@@ -525,7 +523,7 @@ class DeepSpeedLight(Module): ...@@ -525,7 +523,7 @@ class DeepSpeedLight(Module):
clip_grad = self.gradient_clipping() clip_grad = self.gradient_clipping()
if self.optimizer_name() == ADAM_OPTIMIZER: if self.optimizer_name() == ADAM_OPTIMIZER:
if self.dynamic_loss_scale(): if self.dynamic_loss_scale():
logging.info('Creating fp16 optimizer with dynamic loss scale') logger.info('Creating fp16 optimizer with dynamic loss scale')
optimizer = FP16_Optimizer( optimizer = FP16_Optimizer(
optimizer, optimizer,
dynamic_loss_scale=True, dynamic_loss_scale=True,
...@@ -535,7 +533,7 @@ class DeepSpeedLight(Module): ...@@ -535,7 +533,7 @@ class DeepSpeedLight(Module):
clip_grad=clip_grad, clip_grad=clip_grad,
fused_adam_legacy=self.optimizer_legacy_fusion()) fused_adam_legacy=self.optimizer_legacy_fusion())
else: else:
logging.info('Creating fp16 optimizer with static loss scale: {}'.format( logger.info('Creating fp16 optimizer with static loss scale: {}'.format(
self.loss_scale())) self.loss_scale()))
optimizer = FP16_Optimizer( optimizer = FP16_Optimizer(
optimizer, optimizer,
...@@ -544,7 +542,7 @@ class DeepSpeedLight(Module): ...@@ -544,7 +542,7 @@ class DeepSpeedLight(Module):
clip_grad=clip_grad, clip_grad=clip_grad,
fused_adam_legacy=self.optimizer_legacy_fusion()) fused_adam_legacy=self.optimizer_legacy_fusion())
else: else:
logging.info('Creating fp16 unfused optimizer with dynamic loss scale') logger.info('Creating fp16 unfused optimizer with dynamic loss scale')
optimizer = FP16_UnfusedOptimizer( optimizer = FP16_UnfusedOptimizer(
optimizer, optimizer,
dynamic_loss_scale=self.dynamic_loss_scale(), dynamic_loss_scale=self.dynamic_loss_scale(),
...@@ -557,11 +555,11 @@ class DeepSpeedLight(Module): ...@@ -557,11 +555,11 @@ class DeepSpeedLight(Module):
def _configure_zero_optimizer(self, optimizer): def _configure_zero_optimizer(self, optimizer):
zero_stage = self.zero_optimization_stage() zero_stage = self.zero_optimization_stage()
logging.info('Creating fp16 ZeRO stage {} optimizer'.format(zero_stage)) logger.info('Creating fp16 ZeRO stage {} optimizer'.format(zero_stage))
if zero_stage == ZERO_OPTIMIZATION_OPTIMIZER_STATES: if zero_stage == ZERO_OPTIMIZATION_OPTIMIZER_STATES:
assert self.zero_reduce_scatter(), 'Stage 1 only supports reduce scatter mode' assert self.zero_reduce_scatter(), 'Stage 1 only supports reduce scatter mode'
logging.info('Creating fp16 ZeRO Optimizer Stage 1') logger.info('Creating fp16 ZeRO Optimizer Stage 1')
optimizer = FP16_DeepSpeedZeroOptimizer_Stage1( optimizer = FP16_DeepSpeedZeroOptimizer_Stage1(
optimizer, optimizer,
static_loss_scale=self.loss_scale(), static_loss_scale=self.loss_scale(),
...@@ -593,7 +591,7 @@ class DeepSpeedLight(Module): ...@@ -593,7 +591,7 @@ class DeepSpeedLight(Module):
gradient_predivide_factor=self.gradient_predivide_factor()) gradient_predivide_factor=self.gradient_predivide_factor())
else: else:
raise NotImplementedError("ZeRO stage {} not implemented".format(zero_stage)) raise NotImplementedError("ZeRO stage {} not implemented".format(zero_stage))
logging.info('Creating fp16 zero stage {} optimizer'.format(zero_stage)) logger.info('Creating fp16 zero stage {} optimizer'.format(zero_stage))
return optimizer return optimizer
...@@ -658,7 +656,7 @@ class DeepSpeedLight(Module): ...@@ -658,7 +656,7 @@ class DeepSpeedLight(Module):
else: else:
scaled_loss = prescaled_loss scaled_loss = prescaled_loss
if self.warn_unscaled_loss: if self.warn_unscaled_loss:
logging.warning( logger.warning(
f'DeepSpeed unable to scale loss because of type: {type(prescaled_loss)}' f'DeepSpeed unable to scale loss because of type: {type(prescaled_loss)}'
) )
self.warn_unscaled_loss = False self.warn_unscaled_loss = False
...@@ -892,7 +890,7 @@ class DeepSpeedLight(Module): ...@@ -892,7 +890,7 @@ class DeepSpeedLight(Module):
def _report_progress(self, step): def _report_progress(self, step):
lr = self.get_lr() lr = self.get_lr()
mom = self.get_mom() mom = self.get_mom()
logging.info('rank:{} step={}, skipped={}, lr={}, mom={}'.format( logger.info('rank:{} step={}, skipped={}, lr={}, mom={}'.format(
self.global_rank, self.global_rank,
step, step,
self.skipped_steps, self.skipped_steps,
...@@ -1096,12 +1094,12 @@ class DeepSpeedLight(Module): ...@@ -1096,12 +1094,12 @@ class DeepSpeedLight(Module):
load_path = self._get_ckpt_name(load_dir, tag) load_path = self._get_ckpt_name(load_dir, tag)
if not os.path.exists(load_path): if not os.path.exists(load_path):
logging.warn( logger.warn(
'Client provided checkpoint load path: {} does not exist ... skip checkpoint load' 'Client provided checkpoint load path: {} does not exist ... skip checkpoint load'
.format(load_path)) .format(load_path))
return None, None return None, None
logging.info('Loading checkpoint: {}'.format(load_path)) logger.info('Loading checkpoint: {}'.format(load_path))
checkpoint = torch.load(load_path, map_location=lambda storage, loc: storage) checkpoint = torch.load(load_path, map_location=lambda storage, loc: storage)
self.load_module_state_dict(state_dict=checkpoint['module'], self.load_module_state_dict(state_dict=checkpoint['module'],
...@@ -1136,7 +1134,7 @@ class DeepSpeedLight(Module): ...@@ -1136,7 +1134,7 @@ class DeepSpeedLight(Module):
zero_checkpoint_name = self._get_zero_ckpt_name(load_dir, tag) zero_checkpoint_name = self._get_zero_ckpt_name(load_dir, tag)
if not os.path.exists(zero_checkpoint_name): if not os.path.exists(zero_checkpoint_name):
logging.warn( logger.warn(
'Client provided checkpoint load path: {} does not exist ... skip checkpoint load' 'Client provided checkpoint load path: {} does not exist ... skip checkpoint load'
.format(zero_checkpoint_name)) .format(zero_checkpoint_name))
return None return None
...@@ -1144,7 +1142,7 @@ class DeepSpeedLight(Module): ...@@ -1144,7 +1142,7 @@ class DeepSpeedLight(Module):
zero_sd = torch.load(zero_checkpoint_name, map_location='cpu') zero_sd = torch.load(zero_checkpoint_name, map_location='cpu')
self.optimizer.load_state_dict(zero_sd['optimizer_state_dict'], self.optimizer.load_state_dict(zero_sd['optimizer_state_dict'],
load_optimizer_states=load_optimizer_states) load_optimizer_states=load_optimizer_states)
logging.info('loading zero checkpoint {}'.format(zero_checkpoint_name)) logger.info('loading zero checkpoint {}'.format(zero_checkpoint_name))
def save_checkpoint(self, save_dir, tag, client_state={}): def save_checkpoint(self, save_dir, tag, client_state={}):
r"""Save training checkpoint r"""Save training checkpoint
...@@ -1180,7 +1178,7 @@ class DeepSpeedLight(Module): ...@@ -1180,7 +1178,7 @@ class DeepSpeedLight(Module):
checkpoint_name = self._get_zero_ckpt_name(save_dir, tag) checkpoint_name = self._get_zero_ckpt_name(save_dir, tag)
self._ensure_directory_exists(checkpoint_name) self._ensure_directory_exists(checkpoint_name)
except: except:
logging.error( logger.error(
f'Failed Saving model checkpoint to {save_dir} with tag {tag}') f'Failed Saving model checkpoint to {save_dir} with tag {tag}')
return False return False
dist.barrier() dist.barrier()
...@@ -1207,7 +1205,7 @@ class DeepSpeedLight(Module): ...@@ -1207,7 +1205,7 @@ class DeepSpeedLight(Module):
} }
state.update(client_state) state.update(client_state)
logging.info('Saving model checkpoint: {}'.format(save_path)) logger.info('Saving model checkpoint: {}'.format(save_path))
torch.save(state, save_path) torch.save(state, save_path)
def _save_zero_checkpoint(self, save_path, tag): def _save_zero_checkpoint(self, save_path, tag):
...@@ -1215,4 +1213,4 @@ class DeepSpeedLight(Module): ...@@ -1215,4 +1213,4 @@ class DeepSpeedLight(Module):
#self._ensure_directory_exists(zero_checkpoint_name) #self._ensure_directory_exists(zero_checkpoint_name)
zero_sd = {'optimizer_state_dict': self.optimizer.state_dict()} zero_sd = {'optimizer_state_dict': self.optimizer.state_dict()}
torch.save(zero_sd, zero_checkpoint_name) torch.save(zero_sd, zero_checkpoint_name)
logging.info('zero checkpoint saved {}'.format(zero_checkpoint_name)) logger.info('zero checkpoint saved {}'.format(zero_checkpoint_name))
...@@ -13,7 +13,7 @@ from torch.optim import Optimizer ...@@ -13,7 +13,7 @@ from torch.optim import Optimizer
from typing import Union, List from typing import Union, List
import math import math
from deepspeed.pt.deepspeed_constants import * from deepspeed.pt.deepspeed_constants import *
import logging from deepspeed.pt.log_utils import logger
LR_SCHEDULE = 'lr_schedule' LR_SCHEDULE = 'lr_schedule'
LR_RANGE_TEST = 'LRRangeTest' LR_RANGE_TEST = 'LRRangeTest'
...@@ -550,7 +550,7 @@ class OneCycle(object): ...@@ -550,7 +550,7 @@ class OneCycle(object):
last_batch_iteration): last_batch_iteration):
if 'betas' not in optimizer.defaults: if 'betas' not in optimizer.defaults:
optimizer_name = type(optimizer).__name__ optimizer_name = type(optimizer).__name__
logging.warn( logger.warn(
f"cycle_momentum is disabled because optimizer {optimizer_name} does not support momentum, no betas attribute in defaults" f"cycle_momentum is disabled because optimizer {optimizer_name} does not support momentum, no betas attribute in defaults"
) )
self.cycle_momentum = False self.cycle_momentum = False
......
...@@ -7,7 +7,6 @@ import sys ...@@ -7,7 +7,6 @@ import sys
import json import json
import shutil import shutil
import base64 import base64
import logging
import argparse import argparse
import subprocess import subprocess
import collections import collections
...@@ -16,6 +15,7 @@ from copy import deepcopy ...@@ -16,6 +15,7 @@ from copy import deepcopy
import torch.cuda import torch.cuda
from deepspeed.pt.deepspeed_constants import TORCH_DISTRIBUTED_DEFAULT_PORT from deepspeed.pt.deepspeed_constants import TORCH_DISTRIBUTED_DEFAULT_PORT
from deepspeed.pt.log_utils import logger
DLTS_HOSTFILE = "/job/hostfile" DLTS_HOSTFILE = "/job/hostfile"
EXPORT_ENVS = ["NCCL", "PYTHON"] EXPORT_ENVS = ["NCCL", "PYTHON"]
...@@ -87,8 +87,8 @@ def parse_args(args=None): ...@@ -87,8 +87,8 @@ def parse_args(args=None):
def fetch_hostfile(hostfile_path): def fetch_hostfile(hostfile_path):
if not os.path.isfile(hostfile_path): if not os.path.isfile(hostfile_path):
logging.warning("Unable to find hostfile, will proceed with training " logger.warning("Unable to find hostfile, will proceed with training "
"with local resources only.") "with local resources only.")
return None return None
# e.g., worker-0 slots=16 # e.g., worker-0 slots=16
...@@ -101,12 +101,12 @@ def fetch_hostfile(hostfile_path): ...@@ -101,12 +101,12 @@ def fetch_hostfile(hostfile_path):
_, slot_count = slots.split("=") _, slot_count = slots.split("=")
slot_count = int(slot_count) slot_count = int(slot_count)
except ValueError as err: except ValueError as err:
logging.error("Hostfile is not formatted correctly, unable to " logger.error("Hostfile is not formatted correctly, unable to "
"proceed with training.") "proceed with training.")
raise err raise err
if hostname in resource_pool: if hostname in resource_pool:
logging.error("Hostfile contains duplicate hosts, unable to " logger.error("Hostfile contains duplicate hosts, unable to "
"proceed with training.") "proceed with training.")
raise ValueError("host {} is already defined".format(hostname)) raise ValueError("host {} is already defined".format(hostname))
resource_pool[hostname] = slot_count resource_pool[hostname] = slot_count
...@@ -169,7 +169,7 @@ def parse_resource_filter(host_info, include_str="", exclude_str=""): ...@@ -169,7 +169,7 @@ def parse_resource_filter(host_info, include_str="", exclude_str=""):
filtered_hosts[hostname] = slots filtered_hosts[hostname] = slots
elif exclude_str: elif exclude_str:
for s in slots: for s in slots:
print('removing {} from {}'.format(s, hostname)) logger.info('removing {} from {}'.format(s, hostname))
filtered_hosts[hostname].remove(s) filtered_hosts[hostname].remove(s)
# User just specified the whole node # User just specified the whole node
...@@ -256,7 +256,7 @@ def main(args=None): ...@@ -256,7 +256,7 @@ def main(args=None):
hostname_cmd = ["ssh {} hostname -I".format(first_host)] hostname_cmd = ["ssh {} hostname -I".format(first_host)]
result = subprocess.check_output(hostname_cmd, shell=True) result = subprocess.check_output(hostname_cmd, shell=True)
args.master_addr = result.decode('utf-8').split()[0] args.master_addr = result.decode('utf-8').split()[0]
logging.info("Using IP address of {} for node {}".format( logger.info("Using IP address of {} for node {}".format(
args.master_addr, args.master_addr,
first_host)) first_host))
...@@ -292,7 +292,7 @@ def main(args=None): ...@@ -292,7 +292,7 @@ def main(args=None):
env['PDSH_RCMD_TYPE'] = 'ssh' env['PDSH_RCMD_TYPE'] = 'ssh'
active_workers = ",".join(active_resources.keys()) active_workers = ",".join(active_resources.keys())
logging.info("Running on the following workers: %s" % active_workers) logger.info("Running on the following workers: %s" % active_workers)
pdsh_cmd_args = ['pdsh', '-w', active_workers] pdsh_cmd_args = ['pdsh', '-w', active_workers]
...@@ -330,13 +330,10 @@ def main(args=None): ...@@ -330,13 +330,10 @@ def main(args=None):
"--master_port={}".format(args.master_port) "--master_port={}".format(args.master_port)
] ]
cmd = pdsh_cmd_args + deepspeed_launch + [args.user_script] + args.user_args cmd = pdsh_cmd_args + deepspeed_launch + [args.user_script] + args.user_args
print("cmd={}".format(cmd), flush=True) logger.info("cmd={}".format(cmd))
result = subprocess.Popen(cmd, env=env) result = subprocess.Popen(cmd, env=env)
result.wait() result.wait()
if __name__ == "__main__": if __name__ == "__main__":
logging.basicConfig(level=logging.INFO,
format="[%(levelname)s %(asctime)s] %(message)s",
datefmt="%Y-%m-%d %H:%M:%S")
main() main()
...@@ -3,17 +3,18 @@ Copyright 2019 The Microsoft DeepSpeed Team ...@@ -3,17 +3,18 @@ Copyright 2019 The Microsoft DeepSpeed Team
''' '''
import time import time
import logging
import psutil import psutil
import torch import torch
from deepspeed.pt.log_utils import logger
def print_rank_0(message): def print_rank_0(message):
if torch.distributed.is_initialized(): if torch.distributed.is_initialized():
if torch.distributed.get_rank() == 0: if torch.distributed.get_rank() == 0:
print(message, flush=True) logger.info(message)
else: else:
print(message, flush=True) logger.info(message)
class SynchronizedWallClockTimer: class SynchronizedWallClockTimer:
...@@ -117,7 +118,7 @@ class ThroughputTimer(): ...@@ -117,7 +118,7 @@ class ThroughputTimer():
self.monitor_memory = monitor_memory self.monitor_memory = monitor_memory
self.logging = logging_fn self.logging = logging_fn
if self.logging is None: if self.logging is None:
self.logging = logging.info self.logging = logger.info
self.initialized = False self.initialized = False
def update_epoch_count(self): def update_epoch_count(self):
......
...@@ -9,6 +9,8 @@ Helper functions and classes from multiple sources. ...@@ -9,6 +9,8 @@ Helper functions and classes from multiple sources.
import torch import torch
from torch._six import inf from torch._six import inf
from deepspeed.pt.log_utils import logger
class CheckOverflow(object): class CheckOverflow(object):
'''Checks for overflow in gradient across parallel process''' '''Checks for overflow in gradient across parallel process'''
...@@ -112,7 +114,7 @@ def _handle_overflow(cpu_sum, x, i): ...@@ -112,7 +114,7 @@ def _handle_overflow(cpu_sum, x, i):
if not math.isfinite(float(v)): if not math.isfinite(float(v)):
t_i = v_i t_i = v_i
break break
print( logger.info(
f"rank {rank} detected overflow {cpu_sum} in tensor {i}:{t_i} shape {x.shape}" f"rank {rank} detected overflow {cpu_sum} in tensor {i}:{t_i} shape {x.shape}"
) )
...@@ -253,21 +255,20 @@ def see_memory_usage(message): ...@@ -253,21 +255,20 @@ def see_memory_usage(message):
return return
# Print message except when distributed but not rank 0 # Print message except when distributed but not rank 0
print(message, flush=True) logger.info(message)
print("Memory Allocated ", logger.info(
torch.cuda.memory_allocated() / (1024 * 1024 * 1024), "Memory Allocated %s GigaBytes ",
"GigaBytes", torch.cuda.memory_allocated() / (1024 * 1024 * 1024),
flush=True) )
print("Max Memory Allocated ", logger.info(
torch.cuda.max_memory_allocated() / (1024 * 1024 * 1024), "Max Memory Allocated %s GigaBytes",
"GigaBytes", torch.cuda.max_memory_allocated() / (1024 * 1024 * 1024),
flush=True) )
print("Cache Allocated ", logger.info(
torch.cuda.memory_cached() / (1024 * 1024 * 1024), "Cache Allocated %s GigaBytes",
"GigaBytes", torch.cuda.memory_cached() / (1024 * 1024 * 1024),
flush=True) )
print("Max cache Allocated ", logger.info(
torch.cuda.max_memory_cached() / (1024 * 1024 * 1024), "Max cache Allocated %s GigaBytes",
"GigaBytes", torch.cuda.max_memory_cached() / (1024 * 1024 * 1024),
flush=True) )
print(" ", flush=True)
...@@ -3,9 +3,9 @@ Copyright (c) Microsoft Corporation ...@@ -3,9 +3,9 @@ Copyright (c) Microsoft Corporation
Licensed under the MIT license. Licensed under the MIT license.
""" """
import logging
#from deepspeed.pt.deepspeed_constants import * #from deepspeed.pt.deepspeed_constants import *
from deepspeed.pt.deepspeed_config_utils import get_scalar_param from deepspeed.pt.deepspeed_config_utils import get_scalar_param
from deepspeed.pt.log_utils import logger
######################################### #########################################
# ZeRO optimization # ZeRO optimization
...@@ -104,7 +104,7 @@ class DeepSpeedZeroConfig(object): ...@@ -104,7 +104,7 @@ class DeepSpeedZeroConfig(object):
ZERO_OPTIMIZATION_ALLGATHER_BUCKET_SIZE_DEPRECATED, ZERO_OPTIMIZATION_ALLGATHER_BUCKET_SIZE_DEPRECATED,
ZERO_OPTIMIZATION_ALLGATHER_BUCKET_SIZE_DEFAULT) ZERO_OPTIMIZATION_ALLGATHER_BUCKET_SIZE_DEFAULT)
logging.warning( logger.warning(
'DeepSpeedConfig: this format of ZeRO optimization setup is deprecated. Please use the following format: {}' 'DeepSpeedConfig: this format of ZeRO optimization setup is deprecated. Please use the following format: {}'
.format(ZERO_FORMAT)) .format(ZERO_FORMAT))
return zero_config_dict return zero_config_dict
......
...@@ -17,6 +17,8 @@ from deepspeed.pt.deepspeed_utils import see_memory_usage, is_model_parallel_par ...@@ -17,6 +17,8 @@ from deepspeed.pt.deepspeed_utils import see_memory_usage, is_model_parallel_par
#with gradient partitioning and without #with gradient partitioning and without
pg_correctness_test = False pg_correctness_test = False
from deepspeed.pt.log_utils import logger
try: try:
from apex_C import flatten from apex_C import flatten
from apex_C import unflatten from apex_C import unflatten
...@@ -24,8 +26,8 @@ except ImportError: ...@@ -24,8 +26,8 @@ except ImportError:
try: try:
_ = warned_flatten _ = warned_flatten
except NameError: except NameError:
print( logger.warning(
"Warning: apex was installed without --cpp_ext. Falling back to Python flatten and unflatten." "apex was installed without --cpp_ext. Falling back to Python flatten and unflatten."
) )
warned_flatten = True warned_flatten = True
from torch._utils import _flatten_dense_tensors as flatten from torch._utils import _flatten_dense_tensors as flatten
...@@ -117,8 +119,8 @@ class FP16_DeepSpeedZeroOptimizer(object): ...@@ -117,8 +119,8 @@ class FP16_DeepSpeedZeroOptimizer(object):
gradient_predivide_factor=1.0): gradient_predivide_factor=1.0):
if dist.get_rank() == 0: if dist.get_rank() == 0:
print(f"Reduce bucket size {reduce_bucket_size}") logger.info(f"Reduce bucket size {reduce_bucket_size}")
print(f"Allgather bucket size {allgather_bucket_size}") logger.info(f"Allgather bucket size {allgather_bucket_size}")
# The fused optimizer does all the work. We need this layer for two reason: # The fused optimizer does all the work. We need this layer for two reason:
# 1. maintain same user API from apex.fp16_utils # 1. maintain same user API from apex.fp16_utils
# 2. keep common stuff here in case we need to add ne552w fused optimizer later # 2. keep common stuff here in case we need to add ne552w fused optimizer later
...@@ -217,7 +219,6 @@ class FP16_DeepSpeedZeroOptimizer(object): ...@@ -217,7 +219,6 @@ class FP16_DeepSpeedZeroOptimizer(object):
if dist.get_rank(group=self.dp_process_group) == 0: if dist.get_rank(group=self.dp_process_group) == 0:
see_memory_usage( see_memory_usage(
f"After Flattening and after emptying param group {i} cache") f"After Flattening and after emptying param group {i} cache")
print("")
# set model fp16 weight to slices of flattened buffer # set model fp16 weight to slices of flattened buffer
updated_params = _unflatten_dense_tensors(self.fp16_groups_flat[i], updated_params = _unflatten_dense_tensors(self.fp16_groups_flat[i],
...@@ -345,11 +346,10 @@ class FP16_DeepSpeedZeroOptimizer(object): ...@@ -345,11 +346,10 @@ class FP16_DeepSpeedZeroOptimizer(object):
see_memory_usage("After initializing optimizer states") see_memory_usage("After initializing optimizer states")
if dist.get_rank() == 0: if dist.get_rank() == 0:
print(f"optimizer state initialized") logger.info(f"optimizer state initialized")
if dist.get_rank(group=self.dp_process_group) == 0: if dist.get_rank(group=self.dp_process_group) == 0:
see_memory_usage(f"After initializing ZeRO optimizer") see_memory_usage(f"After initializing ZeRO optimizer")
print("")
def _release_ipg_buffers(self): def _release_ipg_buffers(self):
if self.contiguous_gradients: if self.contiguous_gradients:
...@@ -417,7 +417,7 @@ class FP16_DeepSpeedZeroOptimizer(object): ...@@ -417,7 +417,7 @@ class FP16_DeepSpeedZeroOptimizer(object):
self.report_ipg_memory_usage(f"In ipg_epilogue after reduce_ipg_grads", 0) self.report_ipg_memory_usage(f"In ipg_epilogue after reduce_ipg_grads", 0)
#if dist.get_rank() == 0: #if dist.get_rank() == 0:
# print("Params already reduced ", self.params_already_reduced) # logger.info("Params already reduced %s", self.params_already_reduced)
for i in range(len(self.params_already_reduced)): for i in range(len(self.params_already_reduced)):
self.params_already_reduced[i] = False self.params_already_reduced[i] = False
...@@ -572,7 +572,7 @@ class FP16_DeepSpeedZeroOptimizer(object): ...@@ -572,7 +572,7 @@ class FP16_DeepSpeedZeroOptimizer(object):
def print_rank_0(self, message): def print_rank_0(self, message):
if dist.get_rank() == 0: if dist.get_rank() == 0:
print(message) logger.info(message)
def gradient_reduction_w_predivide(self, tensor): def gradient_reduction_w_predivide(self, tensor):
dp_world_size = dist.get_world_size(group=self.dp_process_group) dp_world_size = dist.get_world_size(group=self.dp_process_group)
...@@ -740,7 +740,7 @@ class FP16_DeepSpeedZeroOptimizer(object): ...@@ -740,7 +740,7 @@ class FP16_DeepSpeedZeroOptimizer(object):
flatten_tensor = _flatten_dense_tensors(tensors) flatten_tensor = _flatten_dense_tensors(tensors)
def print_func(): def print_func():
print(flatten_tensor.contiguous().view(-1).narrow(0, start, n)) logger.info(flatten_tensor.contiguous().view(-1).narrow(0, start, n))
self.sequential_execution(print_func, message) self.sequential_execution(print_func, message)
...@@ -779,7 +779,7 @@ class FP16_DeepSpeedZeroOptimizer(object): ...@@ -779,7 +779,7 @@ class FP16_DeepSpeedZeroOptimizer(object):
if group is None: if group is None:
group = self.dp_process_group group = self.dp_process_group
if dist.get_rank(group=group) == 0: if dist.get_rank(group=group) == 0:
print(message) logger.info(message)
for id in range(dist.get_world_size(group=group)): for id in range(dist.get_world_size(group=group)):
if id == dist.get_rank(group=group): if id == dist.get_rank(group=group):
function() function()
...@@ -984,7 +984,7 @@ class FP16_DeepSpeedZeroOptimizer(object): ...@@ -984,7 +984,7 @@ class FP16_DeepSpeedZeroOptimizer(object):
else: else:
total_norm = 0.0 total_norm = 0.0
#if dist.get_rank() == 0: #if dist.get_rank() == 0:
# print(f"Total Norm begining {total_norm}") # logger.info(f"Total Norm begining {total_norm}")
for g, p in zip(gradients, params): for g, p in zip(gradients, params):
if is_model_parallel_parameter(p) or (self.model_parallel_rank == 0): if is_model_parallel_parameter(p) or (self.model_parallel_rank == 0):
param_norm = g.data.double().norm(2) param_norm = g.data.double().norm(2)
...@@ -1080,7 +1080,7 @@ class FP16_DeepSpeedZeroOptimizer(object): ...@@ -1080,7 +1080,7 @@ class FP16_DeepSpeedZeroOptimizer(object):
self.zero_grad() self.zero_grad()
see_memory_usage('After overflow after clearing gradients') see_memory_usage('After overflow after clearing gradients')
print( logger.info(
"[deepscale] OVERFLOW! Rank {} Skipping step. Attempted loss scale: {}, " "[deepscale] OVERFLOW! Rank {} Skipping step. Attempted loss scale: {}, "
"reducing to {}".format(dist.get_rank(), "reducing to {}".format(dist.get_rank(),
prev_scale, prev_scale,
...@@ -1417,6 +1417,6 @@ def _handle_overflow(cpu_sum, x, i): ...@@ -1417,6 +1417,6 @@ def _handle_overflow(cpu_sum, x, i):
if not math.isfinite(float(v)): if not math.isfinite(float(v)):
t_i = v_i t_i = v_i
break break
print( logger.info(
f"rank {rank} detected overflow {cpu_sum} in tensor {i}:{t_i} shape {x.shape}" f"rank {rank} detected overflow {cpu_sum} in tensor {i}:{t_i} shape {x.shape}"
) )
...@@ -6,12 +6,12 @@ This file is adapted from FP16_Optimizer in NVIDIA/apex ...@@ -6,12 +6,12 @@ This file is adapted from FP16_Optimizer in NVIDIA/apex
''' '''
import torch import torch
import logging
import math import math
from torch._utils import _flatten_dense_tensors, _unflatten_dense_tensors from torch._utils import _flatten_dense_tensors, _unflatten_dense_tensors
from deepspeed.pt.deepspeed_utils import get_grad_norm, CheckOverflow, get_weight_norm from deepspeed.pt.deepspeed_utils import get_grad_norm, CheckOverflow, get_weight_norm
from deepspeed.pt.loss_scaler import INITIAL_LOSS_SCALE, SCALE_WINDOW, MIN_LOSS_SCALE from deepspeed.pt.loss_scaler import INITIAL_LOSS_SCALE, SCALE_WINDOW, MIN_LOSS_SCALE
from deepspeed.pt.log_utils import logger
class FP16_Optimizer(object): class FP16_Optimizer(object):
...@@ -137,9 +137,10 @@ class FP16_Optimizer(object): ...@@ -137,9 +137,10 @@ class FP16_Optimizer(object):
if self.overflow: if self.overflow:
if self.verbose: if self.verbose:
print("[deepspeed] OVERFLOW! Skipping step. Attempted loss " logger.info("[deepspeed] OVERFLOW! Skipping step. Attempted loss "
"scale: {}, reducing to {}".format(prev_scale, "scale: {}, reducing to {}".format(
self.cur_scale)) prev_scale,
self.cur_scale))
return self.overflow return self.overflow
combined_scale = self.unscale_and_clip_grads(grads_groups_flat, combined_scale = self.unscale_and_clip_grads(grads_groups_flat,
norm_groups, norm_groups,
...@@ -190,9 +191,10 @@ class FP16_Optimizer(object): ...@@ -190,9 +191,10 @@ class FP16_Optimizer(object):
if self.overflow: if self.overflow:
if self.verbose: if self.verbose:
print("[deepspeed] OVERFLOW! Skipping step. Attempted loss " logger.info("[deepspeed] OVERFLOW! Skipping step. Attempted loss "
"scale: {}, reducing to {}".format(prev_scale, "scale: {}, reducing to {}".format(
self.cur_scale)) prev_scale,
self.cur_scale))
return self.overflow return self.overflow
self.unscale_and_clip_grads(grads_groups_flat, norm_groups) self.unscale_and_clip_grads(grads_groups_flat, norm_groups)
...@@ -250,8 +252,8 @@ class FP16_Optimizer(object): ...@@ -250,8 +252,8 @@ class FP16_Optimizer(object):
self.min_loss_scale) self.min_loss_scale)
self.last_overflow_iter = self.cur_iter self.last_overflow_iter = self.cur_iter
if self.verbose: if self.verbose:
print(f"\nGrad overflow on iteration {self.cur_iter}") logger.info(f"\nGrad overflow on iteration {self.cur_iter}")
print( logger.info(
f"Reducing dynamic loss scale from {prev_scale} to {self.cur_scale}" f"Reducing dynamic loss scale from {prev_scale} to {self.cur_scale}"
) )
else: else:
...@@ -260,14 +262,15 @@ class FP16_Optimizer(object): ...@@ -260,14 +262,15 @@ class FP16_Optimizer(object):
if (stable_interval > 0) and (stable_interval % self.scale_window == 0): if (stable_interval > 0) and (stable_interval % self.scale_window == 0):
self.cur_scale *= self.scale_factor self.cur_scale *= self.scale_factor
if self.verbose: if self.verbose:
print(f"\nNo Grad overflow for {self.scale_window} iterations") logger.info(
print( f"No Grad overflow for {self.scale_window} iterations")
logger.info(
f"Increasing dynamic loss scale from {prev_scale} to {self.cur_scale}" f"Increasing dynamic loss scale from {prev_scale} to {self.cur_scale}"
) )
else: else:
if skip: if skip:
print("\nGrad overflow on iteration", self.cur_iter) logger.info("Grad overflow on iteration: %s", self.cur_iter)
print("Using static loss scale of", self.cur_scale) logger.info("Using static loss scale of: %s", self.cur_scale)
self.cur_iter += 1 self.cur_iter += 1
return return
......
...@@ -8,10 +8,10 @@ This file is adapted from FP16_Optimizer in NVIDIA/apex ...@@ -8,10 +8,10 @@ This file is adapted from FP16_Optimizer in NVIDIA/apex
import torch import torch
from torch._utils import _flatten_dense_tensors, _unflatten_dense_tensors from torch._utils import _flatten_dense_tensors, _unflatten_dense_tensors
import math import math
import logging
from deepspeed.pt.deepspeed_utils import get_grad_norm, CheckOverflow, get_weight_norm from deepspeed.pt.deepspeed_utils import get_grad_norm, CheckOverflow, get_weight_norm
from deepspeed.pt.loss_scaler import INITIAL_LOSS_SCALE, SCALE_WINDOW, MIN_LOSS_SCALE from deepspeed.pt.loss_scaler import INITIAL_LOSS_SCALE, SCALE_WINDOW, MIN_LOSS_SCALE
from deepspeed.pt.log_utils import logger
class FP16_UnfusedOptimizer(object): class FP16_UnfusedOptimizer(object):
...@@ -33,7 +33,7 @@ class FP16_UnfusedOptimizer(object): ...@@ -33,7 +33,7 @@ class FP16_UnfusedOptimizer(object):
self.fused_lamb_legacy = fused_lamb_legacy self.fused_lamb_legacy = fused_lamb_legacy
if torch.distributed.get_rank() == 0: if torch.distributed.get_rank() == 0:
logging.info(f'Fused Lamb Legacy : {self.fused_lamb_legacy} ') logger.info(f'Fused Lamb Legacy : {self.fused_lamb_legacy} ')
if not torch.cuda.is_available: if not torch.cuda.is_available:
raise SystemError("Cannot use fp16 without CUDA.") raise SystemError("Cannot use fp16 without CUDA.")
...@@ -137,9 +137,10 @@ class FP16_UnfusedOptimizer(object): ...@@ -137,9 +137,10 @@ class FP16_UnfusedOptimizer(object):
self._update_scale(self.overflow) self._update_scale(self.overflow)
if self.overflow: if self.overflow:
if self.verbose: if self.verbose:
print("[deepspeed] OVERFLOW! Skipping step. Attempted loss " logger.info("[deepspeed] OVERFLOW! Skipping step. Attempted loss "
"scale: {}, reducing to {}".format(prev_scale, "scale: {}, reducing to {}".format(
self.cur_scale)) prev_scale,
self.cur_scale))
return self.overflow return self.overflow
combined_scale = self.unscale_and_clip_grads(norm_groups, apply_scale=False) combined_scale = self.unscale_and_clip_grads(norm_groups, apply_scale=False)
...@@ -162,9 +163,10 @@ class FP16_UnfusedOptimizer(object): ...@@ -162,9 +163,10 @@ class FP16_UnfusedOptimizer(object):
self._update_scale(self.overflow) self._update_scale(self.overflow)
if self.overflow: if self.overflow:
if self.verbose: if self.verbose:
print("[deepspeed] OVERFLOW! Skipping step. Attempted loss " logger.info("[deepspeed] OVERFLOW! Skipping step. Attempted loss "
"scale: {}, reducing to {}".format(prev_scale, "scale: {}, reducing to {}".format(
self.cur_scale)) prev_scale,
self.cur_scale))
return self.overflow return self.overflow
norm_groups = [] norm_groups = []
...@@ -236,8 +238,8 @@ class FP16_UnfusedOptimizer(object): ...@@ -236,8 +238,8 @@ class FP16_UnfusedOptimizer(object):
self.min_loss_scale) self.min_loss_scale)
self.last_overflow_iter = self.cur_iter self.last_overflow_iter = self.cur_iter
if self.verbose: if self.verbose:
print("\nGrad overflow on iteration", self.cur_iter) logger.info("Grad overflow on iteration: %s", self.cur_iter)
print( logger.info(
f"Reducing dynamic loss scale from {prev_scale} to {self.cur_scale}" f"Reducing dynamic loss scale from {prev_scale} to {self.cur_scale}"
) )
else: else:
...@@ -246,14 +248,15 @@ class FP16_UnfusedOptimizer(object): ...@@ -246,14 +248,15 @@ class FP16_UnfusedOptimizer(object):
if (stable_interval > 0) and (stable_interval % self.scale_window == 0): if (stable_interval > 0) and (stable_interval % self.scale_window == 0):
self.cur_scale *= self.scale_factor self.cur_scale *= self.scale_factor
if self.verbose: if self.verbose:
print(f"\nNo Grad overflow for {self.scale_window} iterations") logger.info(
print( f"No Grad overflow for {self.scale_window} iterations")
logger.info(
f"Increasing dynamic loss scale from {prev_scale} to {self.cur_scale}" f"Increasing dynamic loss scale from {prev_scale} to {self.cur_scale}"
) )
else: else:
if skip: if skip:
print("\nGrad overflow on iteration", self.cur_iter) logger.info("Grad overflow on iteration %s", self.cur_iter)
print("Using static loss scale of", self.cur_scale) logger.info("Using static loss scale of %s", self.cur_scale)
self.cur_iter += 1 self.cur_iter += 1
return return
......
import logging
import sys
import torch.distributed as dist
class LoggerFactory:
@staticmethod
def create_logger(name=None, level=logging.INFO):
"""create a logger
Args:
name (str): name of the logger
level: level of logger
Raises:
ValueError is name is None
"""
if name is None:
raise ValueError("name for logger cannot be None")
formatter = logging.Formatter(
"[%(asctime)s] [%(levelname)s] "
"[%(filename)s:%(lineno)d:%(funcName)s] %(message)s")
logger_ = logging.getLogger(name)
logger_.setLevel(level)
logger_.propagate = False
ch = logging.StreamHandler(stream=sys.stdout)
ch.setLevel(level)
ch.setFormatter(formatter)
logger_.addHandler(ch)
return logger_
logger = LoggerFactory.create_logger(name="DeepSpeed", level=logging.INFO)
def log_dist(message, ranks=None, level=logging.INFO):
"""Log message when one of following condition meets
+ not dist.is_initialized()
+ dist.get_rank() in ranks if ranks is not None or ranks = [-1]
Args:
message (str)
ranks (list)
level (int)
"""
should_log = not dist.is_initialized()
ranks = ranks or []
my_rank = dist.get_rank() if dist.is_initialized() else -1
if ranks and not should_log:
should_log = ranks[0] == -1
should_log = should_log or (my_rank in set(ranks))
if should_log:
final_message = "[Rank {}] {}".format(my_rank, message)
logger.log(level, final_message)
...@@ -4,8 +4,8 @@ import torch.distributed as dist ...@@ -4,8 +4,8 @@ import torch.distributed as dist
from torch._utils import _flatten_dense_tensors, _unflatten_dense_tensors from torch._utils import _flatten_dense_tensors, _unflatten_dense_tensors
from collections import defaultdict from collections import defaultdict
from deepspeed.pt.zero_utils import _initialize_parameter_parallel_groups, \ from deepspeed.pt.zero_utils import _initialize_parameter_parallel_groups
pprint from deepspeed.pt.log_utils import log_dist, logger
from deepspeed.pt.loss_scaler import LossScaler, DynamicLossScaler from deepspeed.pt.loss_scaler import LossScaler, DynamicLossScaler
from deepspeed.pt.deepspeed_utils import get_grad_norm, CheckOverflow from deepspeed.pt.deepspeed_utils import get_grad_norm, CheckOverflow
...@@ -18,9 +18,10 @@ def flatten_dense_tensors_sub_partition_aligned(tensor_list, ...@@ -18,9 +18,10 @@ def flatten_dense_tensors_sub_partition_aligned(tensor_list,
for tensor in tensor_list: for tensor in tensor_list:
num_elements = num_elements + tensor.numel() num_elements = num_elements + tensor.numel()
pprint("Total number of elements in model: {}, max elements per com: {}".format( log_dist("Total number of elements in model: {}, max elements per com: {}".format(
num_elements, num_elements,
max_elements_per_comm)) max_elements_per_comm),
ranks=[0])
max_elements_per_comm = min(max_elements_per_comm, num_elements) max_elements_per_comm = min(max_elements_per_comm, num_elements)
sub_partition_size = int(max_elements_per_comm // dp) sub_partition_size = int(max_elements_per_comm // dp)
...@@ -41,10 +42,11 @@ def flatten_dense_tensors_sub_partition_aligned(tensor_list, ...@@ -41,10 +42,11 @@ def flatten_dense_tensors_sub_partition_aligned(tensor_list,
if remaining: if remaining:
elements_to_add = alignment - remaining elements_to_add = alignment - remaining
# adding padded tensor later after we check comm alignment # adding padded tensor later after we check comm alignment
pprint("adding pad tensor for alignment, {} + {}->{}".format( log_dist("adding pad tensor for alignment, {} + {}->{}".format(
num_elements, num_elements,
elements_to_add, elements_to_add,
num_elements + elements_to_add)) num_elements + elements_to_add),
ranks=[0])
#num_elements = num_elements + elements_to_add #num_elements = num_elements + elements_to_add
else: else:
padded_tensor_list = tensor_list padded_tensor_list = tensor_list
...@@ -55,11 +57,12 @@ def flatten_dense_tensors_sub_partition_aligned(tensor_list, ...@@ -55,11 +57,12 @@ def flatten_dense_tensors_sub_partition_aligned(tensor_list,
"size" "size"
num_comm_intervals = int(num_partitions // dp) num_comm_intervals = int(num_partitions // dp)
partition_remaining = int(num_partitions % dp) partition_remaining = int(num_partitions % dp)
pprint("num_comm_intervals={}, partition_remaining={}".format( log_dist("num_comm_intervals={}, partition_remaining={}".format(
num_comm_intervals, num_comm_intervals,
partition_remaining)) partition_remaining),
ranks=[0])
if partition_remaining != 0: if partition_remaining != 0:
pprint("adding pad tensor and/or extra sub partition") log_dist("adding pad tensor and/or extra sub partition", ranks=[0])
# add pad tensor for alignment of comm interval, this overrules previous possibly sub-partition alignment # add pad tensor for alignment of comm interval, this overrules previous possibly sub-partition alignment
num_comm_intervals += 1 num_comm_intervals += 1
aligned_comm_elements = num_comm_intervals * sub_partition_size * dp aligned_comm_elements = num_comm_intervals * sub_partition_size * dp
...@@ -69,10 +72,11 @@ def flatten_dense_tensors_sub_partition_aligned(tensor_list, ...@@ -69,10 +72,11 @@ def flatten_dense_tensors_sub_partition_aligned(tensor_list,
device=tensor_list[0].device, device=tensor_list[0].device,
dtype=tensor_list[0].dtype) dtype=tensor_list[0].dtype)
padded_tensor_list = tensor_list + [pad_tensor] padded_tensor_list = tensor_list + [pad_tensor]
pprint("adding pad tensor and/or extra sub partition, {} + {}->{}".format( log_dist("adding pad tensor and/or extra sub partition, {} + {}->{}".format(
num_elements, num_elements,
elements_to_add, elements_to_add,
num_elements + elements_to_add)) num_elements + elements_to_add),
ranks=[0])
num_elements += elements_to_add num_elements += elements_to_add
elif elements_to_add > 0: elif elements_to_add > 0:
# add pad tensor for just alignment of sub-partition # add pad tensor for just alignment of sub-partition
...@@ -83,7 +87,7 @@ def flatten_dense_tensors_sub_partition_aligned(tensor_list, ...@@ -83,7 +87,7 @@ def flatten_dense_tensors_sub_partition_aligned(tensor_list,
num_elements += elements_to_add num_elements += elements_to_add
if pg is None or dist.get_rank(group=pg) == 0: if pg is None or dist.get_rank(group=pg) == 0:
print("Number of Elements (w. padding) is ", num_elements) logger.info("Number of Elements (w. padding) is %s", num_elements)
padded_num_elems = 0 padded_num_elems = 0
for p in padded_tensor_list: for p in padded_tensor_list:
...@@ -161,7 +165,7 @@ class FP16_DeepSpeedZeroOptimizer_Stage1(object): ...@@ -161,7 +165,7 @@ class FP16_DeepSpeedZeroOptimizer_Stage1(object):
self.allgather_size = allgather_size self.allgather_size = allgather_size
self.max_elements_per_comm = max_elements_per_comm self.max_elements_per_comm = max_elements_per_comm
print("max_elements_per_comm={}".format(max_elements_per_comm)) logger.info("max_elements_per_comm={}".format(max_elements_per_comm))
# param flattened by groups # param flattened by groups
self.fp16_groups = [] self.fp16_groups = []
...@@ -311,14 +315,14 @@ class FP16_DeepSpeedZeroOptimizer_Stage1(object): ...@@ -311,14 +315,14 @@ class FP16_DeepSpeedZeroOptimizer_Stage1(object):
assert num_sub_partitions % world_size == 0, "{} % {} != 0".format(num_sub_partitions, world_size) assert num_sub_partitions % world_size == 0, "{} % {} != 0".format(num_sub_partitions, world_size)
if not dist.is_initialized() or dist.get_rank(group=dp_process_group) == 0: if not dist.is_initialized() or dist.get_rank(group=dp_process_group) == 0:
print("**** partition info:") logger.info("**** partition info:")
print("\t total_num_elements=", total_num_elements) logger.info("\t total_num_elements=%s", total_num_elements)
print("\t world_size=", world_size) logger.info("\t world_size=%s", world_size)
print("\t max_elements_per_comm=", max_elements_per_comm) logger.info("\t max_elements_per_comm=%s", max_elements_per_comm)
print("\t sub_partition_size=", sub_partition_size) logger.info("\t sub_partition_size=%s", sub_partition_size)
print("\t num_sub_partitions=", num_sub_partitions) logger.info("\t num_sub_partitions=%s", num_sub_partitions)
print("\t num_comm_intervals=", num_comm_intervals) logger.info("\t num_comm_intervals=%s", num_comm_intervals)
print("****") logger.info("****")
# [comm_id] -> [rank] # [comm_id] -> [rank]
comm_partitions = [] comm_partitions = []
...@@ -369,14 +373,14 @@ class FP16_DeepSpeedZeroOptimizer_Stage1(object): ...@@ -369,14 +373,14 @@ class FP16_DeepSpeedZeroOptimizer_Stage1(object):
for iii, tensor in enumerate(tensor_list): for iii, tensor in enumerate(tensor_list):
tensor_size = tensor.numel() tensor_size = tensor.numel()
#if local_rank == 0: #if local_rank == 0:
# #print("rank={}, current_index={}, tensor_size={}, tensor-idx={}".format(rank, # # logger.info("rank={}, current_index={}, tensor_size={}, tensor-idx={}".format(rank,
# current_index, tensor_size, iii)) # current_index, tensor_size, iii))
results_list = _range_check(current_index, results_list = _range_check(current_index,
all_element_intervals[rank], all_element_intervals[rank],
tensor_size) tensor_size)
for contained, offset, comm_idx in results_list: for contained, offset, comm_idx in results_list:
#if local_rank == 0: #if local_rank == 0:
# print("rank={}, contained={}, offset={}, comm_idx={}".format(rank, contained, # logger.info("rank={}, contained={}, offset={}, comm_idx={}".format(rank, contained,
# offset, comm_idx)) # offset, comm_idx))
if contained: if contained:
if prev_comm_idx != comm_idx: if prev_comm_idx != comm_idx:
...@@ -479,7 +483,7 @@ class FP16_DeepSpeedZeroOptimizer_Stage1(object): ...@@ -479,7 +483,7 @@ class FP16_DeepSpeedZeroOptimizer_Stage1(object):
flat_sub_partitions.append(_flatten_dense_tensors(flat_tensor_list)) flat_sub_partitions.append(_flatten_dense_tensors(flat_tensor_list))
if num_comm_intervals is not None and len( if num_comm_intervals is not None and len(
flat_sub_partitions) < num_comm_intervals: flat_sub_partitions) < num_comm_intervals:
#print("padding w. sub partitions to ensure uniform communication") # logger.info("padding w. sub partitions to ensure uniform communication")
device = flat_sub_partitions[0].device device = flat_sub_partitions[0].device
for _ in range(num_comm_intervals - len(flat_sub_partitions)): for _ in range(num_comm_intervals - len(flat_sub_partitions)):
flat_sub_partitions.append( flat_sub_partitions.append(
...@@ -611,9 +615,10 @@ class FP16_DeepSpeedZeroOptimizer_Stage1(object): ...@@ -611,9 +615,10 @@ class FP16_DeepSpeedZeroOptimizer_Stage1(object):
if self.overflow: if self.overflow:
self.zero_grad() self.zero_grad()
if self.verbose: if self.verbose:
print("[deepspeed] OVERFLOW! Skipping step. Attempted loss " logger.info("[deepspeed] OVERFLOW! Skipping step. Attempted loss "
"scale: {}, reducing to {}".format(prev_scale, "scale: {}, reducing to {}".format(
self.loss_scale)) prev_scale,
self.loss_scale))
return self.overflow return self.overflow
norm_groups = [] norm_groups = []
...@@ -649,7 +654,7 @@ class FP16_DeepSpeedZeroOptimizer_Stage1(object): ...@@ -649,7 +654,7 @@ class FP16_DeepSpeedZeroOptimizer_Stage1(object):
default_device=self.local_sub_partitions_of_fp32_groups[i][0].device) default_device=self.local_sub_partitions_of_fp32_groups[i][0].device)
#RS: update all our local params with sub-partition grads #RS: update all our local params with sub-partition grads
#print("self.local_sub_partitions_of_fp32_groups[i]={}, local_grad_sub_partitions={}".format(len(self.local_sub_partitions_of_fp32_groups[i]), len(local_grad_sub_partitions))) #logger. info("self.local_sub_partitions_of_fp32_groups[i]={}, local_grad_sub_partitions={}".format(len(self.local_sub_partitions_of_fp32_groups[i]), len(local_grad_sub_partitions)))
for idx, sub_partition_param in enumerate(self.local_sub_partitions_of_fp32_groups[i]): for idx, sub_partition_param in enumerate(self.local_sub_partitions_of_fp32_groups[i]):
sub_partition_param.grad = local_grad_sub_partitions[idx] sub_partition_param.grad = local_grad_sub_partitions[idx]
#self.single_partition_of_fp32_groups[i].grad = single_grad_partition #self.single_partition_of_fp32_groups[i].grad = single_grad_partition
......
import torch import torch
import torch.distributed as dist import torch.distributed as dist
from deepspeed.pt.log_utils import logger
def _initialize_parameter_parallel_groups(parameter_parallel_size=None): def _initialize_parameter_parallel_groups(parameter_parallel_size=None):
data_parallel_size = int(dist.get_world_size()) data_parallel_size = int(dist.get_world_size())
if parameter_parallel_size is None: parameter_parallel_size = parameter_parallel_size or data_parallel_size
parameter_parallel_size = int(data_parallel_size) logger.info("data_parallel_size: %s, parameter_parallel_size: %s",
print(data_parallel_size, parameter_parallel_size) data_parallel_size,
parameter_parallel_size)
assert data_parallel_size % parameter_parallel_size == 0, \ assert data_parallel_size % parameter_parallel_size == 0, \
'world size should be divisible by parameter parallel size' 'world size should be divisible by parameter parallel size'
rank = dist.get_rank() rank = dist.get_rank()
my_group = None my_group = None
for i in range(dist.get_world_size() // parameter_parallel_size): for i in range(data_parallel_size // parameter_parallel_size):
ranks = range(i * parameter_parallel_size, (i + 1) * parameter_parallel_size) ranks = range(i * parameter_parallel_size, (i + 1) * parameter_parallel_size)
group = torch.distributed.new_group(ranks) group = torch.distributed.new_group(ranks)
if rank in ranks: if rank in ranks:
my_group = group my_group = group
return my_group return my_group
def pprint(msg):
if not dist.is_initialized() or dist.get_rank() == 0:
print(msg)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment