Unverified Commit ddcf58ca authored by Frank Lee's avatar Frank Lee Committed by GitHub
Browse files

Revert "[sync] sync feature/shardformer with develop"

parent 24651fdd
from typing import Any, Callable, Dict, List
import torch
import torch.nn as nn
from transformers.pytorch_utils import Conv1D
from ..policies.autopolicy import get_autopolicy
from ..policies.basepolicy import Policy
from ..utils.utils import getattr_, hasattr_, setattr_
from .shard_config import ShardConfig
from .slicer import Slicer
__all__ = ['ModelSharder', 'shard_model']
class ModelSharder(object):
r"""
Shard the original huggingface model according to the policy
Args:
policy (:class:`Policy`): The policy to shard the model
model (:class:`torch.Module`): The model to shard
shard_config: The setting of distributed model
"""
def __init__(
self,
model: nn.Module,
policy: Policy,
shard_config: ShardConfig = None, # TODO
) -> None:
self.model = model
self.policy = get_autopolicy(self.model) if policy is None else policy
self.slicer = Slicer(shard_config)
self.shard_config = shard_config
self.model_config = self.model.config
def shard(self) -> None:
self.reshape_embedding()
self.inject_model(self.model)
self.replace_layer(self.model)
self.bind_layer(self.model)
def reshape_embedding(self,) -> None:
r"""
Reshape the Embedding layer to make the embedding dimension divisible by world_size
"""
vocab_size = self.model_config.vocab_size
world_size = self.shard_config.world_size
if vocab_size % world_size != 0:
new_vocab_size = vocab_size + world_size - vocab_size % world_size
self.model.resize_token_embeddings(new_vocab_size)
self.model_config = self.model.config
def inject_model(
self,
model: nn.Module,
) -> None:
r"""
Replace the model to policy defined model
Mainly modify the forward and backward to fit distributed model
e.g.
::
BertForMaskedLM.forward -> BertForMaskedLM_.forward
"""
inject_policy = self.policy.inject_policy()
if inject_policy is None:
return
org_model_cls = inject_policy[0]
shard_model_cls = inject_policy[1]
if model.__class__ == org_model_cls:
for key in shard_model_cls.__dict__.keys():
if hasattr(model.__class__, key):
setattr(
model.__class__,
key,
getattr(shard_model_cls, key),
)
else:
raise NotImplementedError(f"{model.__class__} is not implemented so far")
def replace_layer(
self,
model: nn.Module,
) -> None:
r"""
Replace the layer according to the policy, and replace the layer one by one
Args:
model (:class:`torch.nn.Module`): The layer to shard
"""
argument_policies = self.policy.argument_policy(self.model_config, self.shard_config.world_size)
for argument_policy in argument_policies.items():
origin_layer_cls = argument_policy[0]
attr_dict = argument_policy[1].attr_dict
param_funcs = argument_policy[1].param_funcs
self.traverse_replace_layer(model, origin_layer_cls, attr_dict, param_funcs)
def traverse_replace_layer(
self,
layer: nn.Module,
origin_cls: nn.Module,
attr_dict: Dict[str, Any],
param_funcs: List[Callable],
) -> None:
r"""
Reverse the replace layer operation
Args:
layer (:class:`torch.nn.Module`): The object of layer to shard
origin_cls (:class:`transformers.model`): The origin layer class
attr_dict (Dict): The attribute dict to modify
policy_cls (:class:`Policy`): The policy class
"""
if layer.__class__ == origin_cls:
for k, v in attr_dict.items():
setattr_(layer, k, v, ignore=True)
self.shard_one_layer(layer, param_funcs)
for name, child in layer.named_children():
self.traverse_replace_layer(child, origin_cls, attr_dict, param_funcs)
return layer
def shard_one_layer(
self,
org_layer: nn.Module,
param_funcs: List[Callable],
) -> None:
r"""
Shard one layer according to the policy, the layer should be the same class as the key in policy's argument_policy return dict
Args:
org_layer (:class:`torch.nn.Module`): The origin layer object to shard
param_funcs (:class:`List[typing.Callable]`): The function list to get shard information in policy class
"""
for func in param_funcs:
policy_layers = func()
for policy_layer in policy_layers:
weight = None
bias = None
weight_attr = policy_layer.weight
bias_attr = policy_layer.bias
replace_layer_cls = policy_layer.replace_layer
ignore = policy_layer.ignore
n_cast = policy_layer.n_cast
reversed = policy_layer.reversed
if policy_layer.__class__.__name__ == "Col_Layer":
gather_output = policy_layer.gather_output
if weight_attr is not None:
if hasattr_(org_layer, weight_attr):
weight = getattr_(org_layer, weight_attr)
elif not ignore:
raise ValueError(f"Layer {org_layer.__class__.__qualname__} has no attribute {weight_attr}")
if bias_attr is not None:
if hasattr_(org_layer, bias_attr):
bias = getattr_(org_layer, bias_attr)
elif not ignore:
raise ValueError(f"Layer {org_layer.__class__.__qualname__} has no attribute {bias_attr}")
# dont have the attribute in policy, and ignore is true
if weight is None and bias is None and ignore:
continue
# set the sliced weight and bias to the new nn_col layer
assert weight is not None or bias is not None
layer_attr = (lambda x: x[:x.rfind(".")])(weight_attr or bias_attr)
# slice weight and bias
weight, bias = self.slicer.slice_weight_bias(weight, bias, policy_layer.__class__, n_cast, reversed)
# create new object to replace the origin layer
if replace_layer_cls is not None:
if isinstance(getattr_(org_layer, layer_attr), (nn.Linear, Conv1D)):
if replace_layer_cls.__name__ == "Linear1D_Row":
replace_layer = replace_layer_cls(weight.shape[1],
weight.shape[0],
bias=False if bias is None else True)
elif replace_layer_cls.__name__ == "Linear1D_Col":
replace_layer = replace_layer_cls(weight.shape[0],
weight.shape[1],
bias=False if bias is None else True,
gather_output=gather_output)
setattr_(org_layer, layer_attr, replace_layer, ignore=ignore)
self.set_param(replace_layer, weight, bias)
elif isinstance(getattr_(org_layer, layer_attr), nn.Embedding):
replace_layer = replace_layer_cls(weight.shape[0], weight.shape[1],
getattr_(org_layer, f"{layer_attr}.padding_idx", ignore=True))
setattr_(org_layer, layer_attr, replace_layer, ignore=ignore)
self.set_param(replace_layer, weight, bias)
else:
raise NotImplementedError(
f"Replacing {getattr_(org_layer, layer_attr).__class__} is not implemented so far")
# do not replace the layer object, just replace the weight and bias
else:
self.set_param(org_layer, layer_attr, weight, bias)
def set_param(self,
layer: Any,
weight: torch.Tensor = None,
bias: torch.Tensor = None,
layer_attr: str = "") -> None:
r"""
Reset the weight and bias of the layer object
Args:
layer (:class:`torch.nn.Module`): The layer object
layer_attr (str): The attribute name of the layer
weight (:class:`torch.Tensor`): The weight of the layer
bias (:class:`torch.Tensor`): The bias of the layer
"""
assert weight is not None or bias is not None
if weight is not None:
setattr_(layer, "weight" if layer_attr == "" else layer_attr + ".weight", nn.Parameter(weight.contiguous()))
self.set_layer_size(layer, layer_attr, weight.shape)
if bias is not None:
setattr_(layer, "bias" if layer_attr == "" else layer_attr + ".bias", nn.Parameter(bias.contiguous()))
def set_layer_size(self, layer: nn.Module, layer_attr: str, size: torch.Size) -> None:
r"""
Set the layer attribute
Args:
layer (:class:`torch.nn.Module`): The layer object
layer_attr (str): The attribute name of the layer
size (:class:`torch.Size`): The size of the tensor
"""
# Tensor.shape[0] -> out_features, Tensor.shape[1] -> in_features
attrs = ["out_features", "in_features"]
for i, attr in enumerate(attrs):
if hasattr_(layer, f"{layer_attr}.{attr}"):
setattr_(layer, f"{layer_attr}.{attr}", size[i])
def bind_layer(self, model: nn.Module) -> None:
r"""
Bind the layer according to the binding policy
Args:
model (:class:`torch.nn.Module`): The shard model
"""
binding_map = self.policy.binding_policy()
if binding_map is None:
return
for k, v in binding_map.items():
param = getattr_(model, k)
param = nn.Parameter(param)
setattr_(model, k, param)
setattr_(model, v, param)
def shard_model(model: nn.Module, shard_config: ShardConfig = None, policy: Policy = None):
r"""
The function is used to shard the PyTorch model.
Args:
model (`torch.nn.Model`): the origin huggingface model
shard_config (`ShardConfig`): the config for distribute information
policy (`Policy`): the custom policy for sharding
"""
sharder = ModelSharder(model=model, shard_config=shard_config, policy=policy)
sharder.shard()
return model
import torch
from ..policies.basepolicy import Col_Layer, Layer, Row_Layer
from .shard_config import ShardConfig
dim_mapping = {Col_Layer: 1, Row_Layer: 0}
class Slicer():
def __init__(
self,
shardconfig: ShardConfig #TODO
) -> None:
self.shardconfig = shardconfig
def slice_weight_bias(
self,
weight: torch.Tensor,
bias: torch.Tensor,
policy_layer_cls: Layer,
n_cast: int = None,
reversed: bool = False,
):
r"""
Slice the weight and bias according to policy layer cls
``Layer`` -> do nothing
``Col_Layer`` -> slice the weight and bias along dim 1
``Row_Layer`` -> slice the weight along dim 0 and do not slice bias
Args:
weight (:class:`torch.nn.Module`): The weight of the layer
bias: (:class:`torch.nn.Module`): The bias of the layer
policy_layer_class (:class:`Policy`): The class represent how to slice the tensor
"""
if policy_layer_cls == Layer:
return weight, bias
dim = dim_mapping[policy_layer_cls] if not reversed else (1 - dim_mapping[policy_layer_cls])
# print(weight.shape, dim)
if policy_layer_cls == Col_Layer:
weight = self.slice_tensor(weight, dim, False, n_cast)
bias = self.slice_tensor(bias, 0, True)
elif policy_layer_cls == Row_Layer:
weight = self.slice_tensor(weight, dim, False, n_cast)
else:
raise NotImplementedError(f"The policy layer class {policy_layer_cls} is not supported")
if reversed:
weight = weight.transpose(0, 1).contiguous()
return weight, bias
def slice_tensor(
self,
tensor_in: torch.Tensor,
dim: int,
is_bias: bool,
n_cast: int = None,
) -> torch.Tensor:
r"""
Slice tensor according to the config
Args:
tensor_in (:class:`torch.Tensor`): The tensor to slice
dim (int): The dimension to slice
is_bias (bool): Whether the tensor is bias
"""
if tensor_in is None:
return None
if not is_bias:
return self.slice_2d(tensor_in, dim, n_cast)
else:
return self.slice_1d(tensor_in, n_cast)
def slice_2d(
self,
tensor: torch.Tensor,
dim: int,
n_cast: int = None,
) -> torch.Tensor:
r"""
Slice the 2D tensor
Args:
tensor (:class:`torch.Tensor`): The tensor to slice
dim (int): The dimension to slice
"""
assert dim in [0, 1], f"Only support 2D tensor, but got {dim}D tensor"
if dim == 0:
return self.slice_row(tensor, n_cast)
elif dim == 1:
return self.slice_col(tensor, n_cast)
def slice_1d(
self,
tensor: torch.Tensor,
n_cast: int = None,
) -> torch.Tensor:
r"""
Slice the 1D tensor
Args:
tensor (:class:`torch.Tensor`): The tensor to slice
Returns:
:class:`torch.Tensor`: The sliced tensor
"""
if n_cast is None:
return tensor.chunk(self.shardconfig.world_size, dim=0)[self.shardconfig.rank].contiguous()
else:
tensor_chunks = tensor.chunk(self.shardconfig.world_size * n_cast, dim=0)
chunk_list = [
tensor_chunks[i] for i in range(self.shardconfig.rank, len(tensor_chunks), self.shardconfig.world_size)
]
return torch.cat(chunk_list, dim=0).contiguous()
def slice_col(
self,
tensor: torch.Tensor,
n_cast: int = None,
) -> torch.Tensor:
r"""
Slice the tensor in column
Args:
tensor (:class:`torch.Tensor`): The tensor to slice
Returns:
:class:`torch.Tensor`: The sliced tensor
"""
if n_cast is None:
return tensor.chunk(self.shardconfig.world_size, dim=0)[self.shardconfig.rank].contiguous()
else:
tensor_chunks = tensor.chunk(self.shardconfig.world_size * n_cast, dim=0)
chunk_list = [
tensor_chunks[i] for i in range(self.shardconfig.rank, len(tensor_chunks), self.shardconfig.world_size)
]
return torch.cat(chunk_list, dim=0).contiguous()
def slice_row(
self,
tensor: torch.Tensor,
n_cast: int = None,
) -> torch.Tensor:
r"""
Slice the tensor in column
Args:
tensor (:class:`torch.Tensor`): The tensor to slice
Returns:
:class:`torch.Tensor`: The sliced tensor
"""
if n_cast is None:
return tensor.chunk(self.shardconfig.world_size, dim=1)[self.shardconfig.rank].contiguous()
else:
tensor_chunks = tensor.chunk(self.shardconfig.world_size * n_cast, dim=1)
chunk_list = [
tensor_chunks[i] for i in range(self.shardconfig.rank, len(tensor_chunks), self.shardconfig.world_size)
]
return torch.cat(chunk_list, dim=1).contiguous()
parallel = dict(data=1, pipeline=1, tensor=dict(size=2, mode='1d'))
import os
import torch
import torch.nn as nn
import torch.nn.functional as F
import colossalai
from colossalai.shardformer.layer.dist_crossentropy import applyDistCrossEntropy
from colossalai.shardformer.layer.dropout import Dropout1D
def get_args():
parser = colossalai.get_default_parser()
parser.add_argument("--module", type=str, default='distloss')
return parser.parse_args()
def test_dist_crossentropy():
pred = torch.randn(2, 4, 8, requires_grad=True)
labels = torch.randint(8, (1, 4)).repeat(2, 1)
pred_ = pred.view(-1, 8)
labels_ = labels.view(-1)
loss = F.cross_entropy(pred_, labels_)
loss.backward()
print(f"normal loss:{loss}")
pred = pred.chunk(int(os.environ['WORLD_SIZE']), -1)[int(os.environ['RANK'])]
loss = applyDistCrossEntropy(pred.to('cuda'), labels.to('cuda'))
loss.backward()
print(f"dist loss:{loss}")
def test_dropout():
input = torch.randn(5, 4).to("cuda")
m = Dropout1D(p=0.2).to("cuda")
for i in range(2):
print(f"Output: {m(input)}")
print(torch.randn(1))
if __name__ == '__main__':
args = get_args()
colossalai.launch_from_torch(config={})
if args.module == 'distloss':
test_dist_crossentropy()
elif args.module == 'dropout':
test_dropout()
else:
print("not implemented yet")
import os
import random
import torch
import torch.nn as nn
from datasets import load_dataset
from torch.utils.data import DataLoader
from tqdm.auto import tqdm
from transformers import AutoTokenizer, BertForMaskedLM, DataCollatorForLanguageModeling, GPT2LMHeadModel, get_scheduler
import colossalai
from colossalai.shardformer.shard import ShardConfig, shard_model
from colossalai.utils import get_current_device, print_rank_0
os.environ['TRANSFORMERS_NO_ADVISORY_WARNINGS'] = 'true'
def get_args():
parser = colossalai.get_default_parser()
parser.add_argument("--mode", type=str, default='inference')
parser.add_argument("--save_model", action='store_true')
parser.add_argument("--model", type=str, default='bert-base-uncased')
return parser.parse_args()
def load_data(args):
tokenizer = AutoTokenizer.from_pretrained(args.model)
if tokenizer.pad_token is None:
tokenizer.add_special_tokens({"pad_token": "[PAD]"})
# tokenizer.pad_token_id = 0
datasets = load_dataset('wikitext', 'wikitext-2-raw-v1')
# datasets=load_dataset("yelp_review_full")
tokenized_datasets = datasets.map(
lambda examples: tokenizer(examples["text"], truncation=True, padding="max_length"), batched=True)
tokenized_datasets = tokenized_datasets.remove_columns(["text"])
# tokenized_datasets=tokenized_datasets.rename_column("label","labels")
tokenized_datasets.set_format("torch")
train_dataset = tokenized_datasets["train"]
test_dataset = tokenized_datasets["test"]
datacollector = DataCollatorForLanguageModeling(tokenizer, mlm=True, mlm_probability=0.15, return_tensors="pt")
train_dataloader = DataLoader(train_dataset, batch_size=16, shuffle=True, collate_fn=datacollector)
eval_dataloader = DataLoader(test_dataset, batch_size=16, shuffle=True, collate_fn=datacollector)
return train_dataloader, eval_dataloader
def inference(model: nn.Module, args):
print(model)
# print(model.wte.weight.shape)
tokenizer = AutoTokenizer.from_pretrained(args.model)
if tokenizer.pad_token is None:
tokenizer.add_special_tokens({"pad_token": "[PAD]"})
tokenizer.pad_token_id = 0
token = "Hello, my dog is cute"
inputs = tokenizer(token, return_tensors="pt")
inputs.to("cuda")
model.eval()
model.to("cuda")
outputs = model(**inputs)
print(outputs[0])
def train(model: nn.Module, args, num_epoch: int = 3):
train_dataloader, eval_dataloader = load_data(args)
optimizer = torch.optim.AdamW(model.parameters(), lr=5e-5)
num_training = num_epoch * len(train_dataloader)
progress_bar = tqdm(range(num_training))
lr_scheduler = get_scheduler(name="linear",
optimizer=optimizer,
num_warmup_steps=0,
num_training_steps=num_training)
best_test_loss = float("inf")
model.to("cuda")
model.train()
for epoch in range(num_epoch):
progress_bar.set_description(f"Rank {get_current_device()} epoch {epoch}")
for batch in train_dataloader:
optimizer.zero_grad()
batch = {k: v.to('cuda') for k, v in batch.items()}
outputs = model(**batch)
loss = outputs.loss
loss.backward()
optimizer.step()
lr_scheduler.step()
progress_bar.update(1)
train_loss = loss
loss = 0.0
for batch in eval_dataloader:
batch = {k: v.to('cuda') for k, v in batch.items()}
outputs = model(**batch)
# loss = outputs.loss
assert not torch.isnan(outputs.loss), f"{batch}"
loss += outputs.loss.item()
# loss = criterion(outputs.logits, batch["input_ids"])
test_loss = loss / len(eval_dataloader)
print_rank_0(f"Train Loss: {train_loss:.4f} Test Loss:{test_loss:.4f}")
if args.save_model and test_loss < best_test_loss:
best_test_loss = test_loss
torch.save(model.state_dict(), "./checkpoints/best_model.pth")
if __name__ == "__main__":
args = get_args()
colossalai.launch_from_torch(config=args.config)
if args.model == 'bert-base-uncased':
model = BertForMaskedLM.from_pretrained("bert-base-uncased")
elif args.model == 'gpt2':
model = GPT2LMHeadModel.from_pretrained("gpt2")
else:
raise AttributeError("model not supported")
shard_config = ShardConfig(
rank=int(str(get_current_device()).split(':')[-1]),
world_size=int(os.environ['WORLD_SIZE']),
)
sharded_model = shard_model(model, shard_config)
if args.mode == "train":
train(sharded_model, args)
elif args.mode == "inference":
inference(sharded_model, args)
else:
raise NotImplementedError
def hasattr_(obj, attr: str):
r"""
Check whether the object has the multi sublevel attr
Args:
obj (object): The object to check
attr (str): The multi level attr to check
"""
attrs = attr.split('.')
for a in attrs:
try:
obj = getattr(obj, a)
except AttributeError:
return False
return True
def setattr_(obj, attr: str, value, ignore: bool = False):
r"""
Set the object's multi sublevel attr to value, if ignore, ignore when it doesn't exist
Args:
obj (object): The object to set
attr (str): The multi level attr to set
value (Any): The value to set
ignore (bool): Whether to ignore when the attr doesn't exist
"""
attrs = attr.split('.')
for a in attrs[:-1]:
try:
obj = getattr(obj, a)
except AttributeError:
if ignore:
return
raise AttributeError(f"Object {obj} has no attribute {attr}")
setattr(obj, attrs[-1], value)
def getattr_(obj, attr: str, ignore: bool = None):
r"""
Get the object's multi sublevel attr
Args:
obj (object): The object to set
attr (str): The multi level attr to set
ignore (bool): Whether to ignore when the attr doesn't exist
"""
attrs = attr.split('.')
for a in attrs:
try:
obj = getattr(obj, a)
except AttributeError:
if ignore:
return None
raise AttributeError(f"Object {obj} has no attribute {attr}")
return obj
......@@ -16,66 +16,69 @@ def _all_gather(tensor, comm_spec):
'''
Implement all gather operation on device mesh based on information provided by comm_spec.
'''
process_groups = comm_spec.device_mesh.get_process_group_for_all_axes()
process_group = process_groups[comm_spec.logical_process_axis]
tensor_list = [
torch.zeros(tensor.shape, dtype=tensor.dtype, device=tensor.device)
for _ in range(comm_spec.device_mesh.mesh_shape[comm_spec.logical_process_axis])
]
# without this contiguous operation, the all gather may get some unexpected results.
tensor = tensor.contiguous()
dist.all_gather(tensor_list, tensor, group=process_group)
output = torch.cat(tuple(tensor_list), comm_spec.gather_dim).contiguous()
return output
process_groups_list = comm_spec.device_mesh.process_groups_dict[comm_spec.logical_process_axis]
for rank_list, process_group in process_groups_list:
if dist.get_rank() in rank_list:
tensor_list = [
torch.zeros(tensor.shape, dtype=tensor.dtype, device=tensor.device)
for _ in range(comm_spec.device_mesh.mesh_shape[comm_spec.logical_process_axis])
]
# without this contiguous operation, the all gather may get some unexpected results.
tensor = tensor.contiguous()
dist.all_gather(tensor_list, tensor, group=process_group)
output = torch.cat(tuple(tensor_list), comm_spec.gather_dim).contiguous()
return output
def _split(tensor, comm_spec):
'''
Implement shard operation on device mesh based on information provided by comm_spec.
'''
process_groups = comm_spec.device_mesh.get_process_group_for_all_axes()
process_group = process_groups[comm_spec.logical_process_axis]
dim = comm_spec.shard_dim
length = tensor.shape[comm_spec.shard_dim] // dist.get_world_size(process_group)
start = length * dist.get_rank(process_group)
output = torch.narrow(tensor, dim, start, length).contiguous()
return output
process_groups_list = comm_spec.device_mesh.process_groups_dict[comm_spec.logical_process_axis]
for rank_list, _ in process_groups_list:
if dist.get_rank() in rank_list:
dim = comm_spec.shard_dim
length = tensor.shape[comm_spec.shard_dim] // len(rank_list)
start = length * rank_list.index(dist.get_rank())
output = torch.narrow(tensor, dim, start, length).contiguous()
return output
def _all_to_all(tensor, comm_spec):
'''
Implement all to all operation on device mesh based on information provided by comm_spec.
'''
process_groups = comm_spec.device_mesh.get_process_group_for_all_axes()
process_group = process_groups[comm_spec.logical_process_axis]
world_size = dist.get_world_size(process_group)
new_shape = list(tensor.shape)
new_shape[comm_spec.shard_dim] = new_shape[comm_spec.shard_dim] // world_size
new_shape = torch.Size(new_shape)
output_tensor_list = [torch.zeros(new_shape, dtype=tensor.dtype, device=tensor.device) for _ in range(world_size)]
dim = comm_spec.shard_dim
length = tensor.shape[comm_spec.shard_dim] // world_size
input_tensor_list = [torch.narrow(tensor, dim, length * i, length).contiguous() for i in range(world_size)]
group = process_group
dist.all_to_all(output_tensor_list, input_tensor_list, group)
output = torch.cat(tuple(output_tensor_list), comm_spec.gather_dim).contiguous()
return output
process_groups_list = comm_spec.device_mesh.process_groups_dict[comm_spec.logical_process_axis]
for rank_list, process_group in process_groups_list:
if dist.get_rank() in rank_list:
new_shape = list(tensor.shape)
new_shape[comm_spec.shard_dim] = new_shape[comm_spec.shard_dim] // len(rank_list)
new_shape = torch.Size(new_shape)
output_tensor_list = [
torch.zeros(new_shape, dtype=tensor.dtype, device=tensor.device) for _ in range(len(rank_list))
]
dim = comm_spec.shard_dim
length = tensor.shape[comm_spec.shard_dim] // len(rank_list)
input_tensor_list = [
torch.narrow(tensor, dim, length * i, length).contiguous() for i in range(len(rank_list))
]
group = process_group
dist.all_to_all(output_tensor_list, input_tensor_list, group)
output = torch.cat(tuple(output_tensor_list), comm_spec.gather_dim).contiguous()
return output
def _all_reduce(tensor, comm_spec, async_op=False):
'''
Implement all reduce operation on device mesh based on information provided by comm_spec.
'''
process_groups = comm_spec.device_mesh.get_process_group_for_all_axes()
process_group = process_groups[comm_spec.logical_process_axis]
if not tensor.is_contiguous():
tensor = tensor.contiguous()
dist.all_reduce(tensor, op=ReduceOp.SUM, group=process_group, async_op=async_op)
return tensor
process_groups_list = comm_spec.device_mesh.process_groups_dict[comm_spec.logical_process_axis]
for rank_list, process_group in process_groups_list:
if dist.get_rank() in rank_list:
if not tensor.is_contiguous():
tensor = tensor.contiguous()
dist.all_reduce(tensor, op=ReduceOp.SUM, group=process_group, async_op=async_op)
return tensor
def _mix_gather(tensor, comm_spec):
......@@ -411,7 +414,7 @@ class CommSpec:
self.forward_only = forward_only
if isinstance(self.logical_process_axis, list):
if not mix_gather:
self.device_mesh = self.sharding_spec.device_mesh.flatten()
self.device_mesh = self.sharding_spec.device_mesh.flatten_device_mesh
self.logical_process_axis = 0
else:
self.device_meshes = self.sharding_spec.device_mesh.flatten_device_meshes
......
# 🔢 Distributed Tensor
## 📚 Table of Contents
- [🔢 Distributed Tensor](#-distributed-tensor)
- [📚 Table of Contents](#-table-of-contents)
- [🔗 Introduction](#-introduction)
- [📝 Design](#-design)
- [🔨 Usage](#-usage)
- [🎈 Progress Log](#-progress-log)
## 🔗 Introduction
Distributed tensor is a type of tensor that is distributed across multiple devices. It is a wrapper of PyTorch tensor, and it is used to support distributed training.
It can represent the device topology and tensor placement over the devices in the topology. It also provides a set of APIs to manipulate the distributed tensor.
## 📝 Design
Our implementation is inspired by the work [Alpa](https://arxiv.org/abs/2201.12023), which unifies data parallelism and tensor parallelism as intra-op parallelism. It uses notations `S` to represent the sharded dimension and `R` to represent the replicated dimension. For example, given a 2D matrix, `[S, R]` represents the tensor is sharded over the first dimension.
Each sharded dimension will have a subscript to represent its placement over the devices. Assuming we have 4 GPUs and the GPUs are arranged in a 2 x 2 manner. Let's say we have a 2D matrix like below:
```text
[1, 2, 3, 4 ]
A = [4, 5, 6, 7 ]
[8, 9, 10, 11]
[12, 13, 14, 15]
```
`[S0, R]` would mean that the first dimension is sharded over the rows in the device topology.
```text
| --------------------—————————————————————-|
| | |
| [1, 2, 3, 4 ] | [1, 2, 3, 4 ] |
| [4, 5, 6, 7 ] | [4, 5, 6, 7 ] |
| | |
| --------------------——————————————————-----
| | |
| [8, 9, 10, 11] | [8, 9, 10, 11] |
| [12, 13, 14, 15] | [12, 13, 14, 15] |
| | |
| --------------------——————————————————-----
```
`[S01, R]` would mean that the first dimension is sharded over both the row and column in the device topology.
```text
| --------------------—————————————————————-|
| | |
| [1, 2, 3, 4 ] | [4, 5, 6, 7 ] |
| | |
| --------------------——————————————————-----
| | |
| [8, 9, 10, 11] | [12, 13, 14, 15] |
| | |
| --------------------——————————————————-----
```
## 🔨 Usage
A sample API usage is given below.
```python
import torch
import colossalai
from colossalai.device.device_mesh import DeviceMesh
from colossalai.tensor.d_tensor import DTensor, ShardingSpec
colossalai.launch_from_torch(config={})
# define your device mesh
# assume you have 4 GPUs
physical_mesh_id = torch.arange(0, 4).reshape(1, 4)
mesh_shape = (2, 2)
device_mesh = DeviceMesh(physical_mesh_id, mesh_shape, init_process_group=True)
# define a tensor
a = torch.rand(16, 32).cuda()
# create sharding spec for the tensor
# assume the sharding spec is [S0, R]
dim_partition_dict = {0: [0]}
sharding_spec = ShardingSpec(a.dim(), dim_partition_dict)
# create a distributed tensor
d_tensor = DTensor(a, device_mesh, sharding_spec)
print(d_tensor)
global_tensor = d_tensor.to_global()
print(global_tensor)
```
## 🎈 Progress Log
- [x] Support layout conversion
- [x] Support sharding on 2D device mesh
- [ ] Support sharding on 3D device mesh
- [ ] Support sharding 4D device mesh
- [ ] Support sharding info saving and offline tensor merge (we can save tensor as dtensor and gather the tensors back to the global tensor based on the sharding info in a single process in CPU, useful for distributed training checkpoint load and save.)
from .d_tensor import DTensor
from .sharding_spec import ShardingSpec
__all__ = ['DTensor', 'ShardingSpec']
......@@ -24,12 +24,12 @@ class CommSpec:
'''
Communication spec is used to record the communication action. It converts the communication spec
to real action which will be used in runtime. It contains comm_pattern to determine the
communication method, process_group_dict to determine the process groups, gather_dim and shard_dim
communication method, process_groups_dict to determine the process groups, gather_dim and shard_dim
to determine the buffer shape, and logical_process_axis
Argument:
comm_pattern(CollectiveCommPattern): decribe the communication method used in this spec.
process_group_dict(Dict): A dict which contains the process groups used to apply this CommSpec.
comm_pattern(CollectiveCommPattern): describe the communication method used in this spec.
process_groups_dict(Dict): A dict which contains the process groups used to apply this CommSpec.
gather_dim(int, Optional): The gather_dim of the tensor will be gathered.
shard_dim(int, Optional): The shard_dim of the tensor will be sharded.
logical_process_axis(Union(int, List[int]), Optional): The mesh_dim to implement the communication action.
......@@ -37,7 +37,7 @@ class CommSpec:
def __init__(self,
comm_pattern: CollectiveCommPattern,
process_group_dict: Dict,
process_groups_dict: Dict,
gather_dim: int = None,
shard_dim: int = None,
logical_process_axis: int = None):
......@@ -45,7 +45,7 @@ class CommSpec:
self.gather_dim = gather_dim
self.shard_dim = shard_dim
self.logical_process_axis = logical_process_axis
self.process_group_dict = process_group_dict
self.process_groups_dict = process_groups_dict
def __repr__(self):
res_list = ["CommSpec:("]
......@@ -92,56 +92,68 @@ def _all_gather(tensor: torch.Tensor, comm_spec: CommSpec):
'''
Implement all gather operation on device mesh based on information provided by comm_spec.
'''
process_group = comm_spec.process_group_dict[comm_spec.logical_process_axis]
world_size = dist.get_world_size(process_group)
tensor_list = [torch.zeros(tensor.shape, dtype=tensor.dtype, device=tensor.device) for _ in range(world_size)]
# without this contiguous operation, the all gather may get some unexpected results.
tensor = tensor.contiguous()
dist.all_gather(tensor_list, tensor, group=process_group)
output = torch.cat(tuple(tensor_list), comm_spec.gather_dim).contiguous()
return output
process_groups_list = comm_spec.process_groups_dict[comm_spec.logical_process_axis]
for rank_list, process_group in process_groups_list:
if dist.get_rank() in rank_list:
tensor_list = [
torch.zeros(tensor.shape, dtype=tensor.dtype, device=tensor.device) for _ in range(len(rank_list))
]
# without this contiguous operation, the all gather may get some unexpected results.
tensor = tensor.contiguous()
dist.all_gather(tensor_list, tensor, group=process_group)
output = torch.cat(tuple(tensor_list), comm_spec.gather_dim).contiguous()
return output
def _split(tensor: torch.Tensor, comm_spec: CommSpec):
'''
Implement shard operation on device mesh based on information provided by comm_spec.
'''
process_group = comm_spec.process_group_dict[comm_spec.logical_process_axis]
dim = comm_spec.shard_dim
length = tensor.shape[comm_spec.shard_dim] // dist.get_world_size(process_group)
start = length * dist.get_rank(process_group)
output = torch.narrow(tensor, dim, start, length).contiguous()
return output
process_groups_list = comm_spec.process_groups_dict[comm_spec.logical_process_axis]
for rank_list, _ in process_groups_list:
if dist.get_rank() in rank_list:
dim = comm_spec.shard_dim
length = tensor.shape[comm_spec.shard_dim] // len(rank_list)
start = length * rank_list.index(dist.get_rank())
output = torch.narrow(tensor, dim, start, length).contiguous()
return output
def _all_to_all(tensor: torch.Tensor, comm_spec: CommSpec):
'''
Implement all to all operation on device mesh based on information provided by comm_spec.
'''
process_group = comm_spec.process_group_dict[comm_spec.logical_process_axis]
world_size = dist.get_world_size(process_group)
new_shape = list(tensor.shape)
new_shape[comm_spec.shard_dim] = new_shape[comm_spec.shard_dim] // world_size
new_shape = torch.Size(new_shape)
output_tensor_list = [torch.zeros(new_shape, dtype=tensor.dtype, device=tensor.device) for _ in range(world_size)]
dim = comm_spec.shard_dim
length = tensor.shape[comm_spec.shard_dim] // world_size
input_tensor_list = [torch.narrow(tensor, dim, length * i, length).contiguous() for i in range(world_size)]
group = process_group
dist.all_to_all(output_tensor_list, input_tensor_list, group)
output = torch.cat(tuple(output_tensor_list), comm_spec.gather_dim).contiguous()
return output
process_groups_list = comm_spec.process_groups_dict[comm_spec.logical_process_axis]
for rank_list, process_group in process_groups_list:
if dist.get_rank() in rank_list:
new_shape = list(tensor.shape)
new_shape[comm_spec.shard_dim] = new_shape[comm_spec.shard_dim] // len(rank_list)
new_shape = torch.Size(new_shape)
output_tensor_list = [
torch.zeros(new_shape, dtype=tensor.dtype, device=tensor.device) for _ in range(len(rank_list))
]
dim = comm_spec.shard_dim
length = tensor.shape[comm_spec.shard_dim] // len(rank_list)
input_tensor_list = [
torch.narrow(tensor, dim, length * i, length).contiguous() for i in range(len(rank_list))
]
group = process_group
dist.all_to_all(output_tensor_list, input_tensor_list, group)
output = torch.cat(tuple(output_tensor_list), comm_spec.gather_dim).contiguous()
return output
def _all_reduce(tensor: torch.Tensor, comm_spec: CommSpec, async_op: bool = False):
'''
Implement all reduce operation on device mesh based on information provided by comm_spec.
'''
process_group = comm_spec.process_group_dict[comm_spec.logical_process_axis]
if not tensor.is_contiguous():
tensor = tensor.contiguous()
dist.all_reduce(tensor, op=ReduceOp.SUM, group=process_group, async_op=async_op)
return tensor
process_groups_list = comm_spec.process_groups_dict[comm_spec.logical_process_axis]
for rank_list, process_group in process_groups_list:
if dist.get_rank() in rank_list:
if not tensor.is_contiguous():
tensor = tensor.contiguous()
dist.all_reduce(tensor, op=ReduceOp.SUM, group=process_group, async_op=async_op)
return tensor
class _ReduceGrad(torch.autograd.Function):
......@@ -257,7 +269,7 @@ class _AllToAll(torch.autograd.Function):
def forward(ctx, input_, comm_spec):
output = _all_to_all(input_, comm_spec)
comm_spec_for_backward = CommSpec(comm_pattern=comm_spec.comm_pattern,
process_group_dict=comm_spec.process_group_dict,
process_groups_dict=comm_spec.process_groups_dict,
gather_dim=comm_spec.shard_dim,
shard_dim=comm_spec.gather_dim,
logical_process_axis=comm_spec.logical_process_axis)
......
......@@ -3,119 +3,55 @@ from typing import Optional
import torch
from torch.utils._pytree import tree_map
from colossalai.device.device_mesh import DeviceMesh
from .layout import Layout
from .layout_converter import LayoutConverter, to_global
from .sharding_spec import ShardingSpec
__all__ = ['DTensor', 'distribute_tensor', 'distribute_module', 'construct_default_sharding_spec']
layout_converter = LayoutConverter()
class DTensor(torch.Tensor):
"""
DTensor stands for distributed tensor. It is a subclass of `torch.Tensor` and contains meta information
about the tensor distribution. The meta information includes the device mesh, the sharding specification,
and the entire shape of the tensor.
During runtime, we will not directly use the DTensor objects for computation. Instead, we will only use the
`DTensor.local_tensor` for computation. The `DTensor.local_tensor` is the local tensor in the current rank.
In this way, all tensors involved in computation will only be native PyTorch tensors.
Example:
```python
from colossalai.device import DeviceMesh
# define your device mesh
# assume you have 4 GPUs
physical_mesh_id = torch.arange(0, 4).reshape(1, 4)
mesh_shape = (2, 2)
device_mesh = DeviceMesh(physical_mesh_id, mesh_shape)
# define a tensor
x = torch.rand(16, 32)
# create sharding spec for the tensor
# assume the sharding spec is [S, R]
dim_partition_dict = {
0: 1
}
sharding_spec = ShardingSpec(a.dim(), dim_partition_dict)
# create a distributed tensor
d_tensor = DTensor(x, device_mesh, sharding_spec)
```
Args:
tensor (`torch.Tensor`): the unsharded tensor.
device_mesh (`DeviceMesh`): the device mesh for abstraction of the compute devices.
sharding_spec (`ShardingSpec`): the sharding specification which describes how the tensor will be sharded.
"""
def __init__(self, tensor: torch.Tensor, device_mesh: DeviceMesh, sharding_spec: ShardingSpec):
# ensure this tensor is not a DTensor
assert not isinstance(tensor, DTensor), 'The input tensor should not be a DTensor.'
# store meta info
self.local_tensor = tensor
self.data_type = tensor.dtype
self.global_shape = tensor.shape
# create distributed layout
dist_layout = Layout(device_mesh=device_mesh, sharding_spec=sharding_spec, global_shape=self.global_shape)
def __init__(self, local_tensor: torch.Tensor, dist_layout: Layout):
self.local_tensor = local_tensor
self.data_type = local_tensor.dtype
self.entire_shape = local_tensor.shape
self.dist_layout = dist_layout
# shard the tensor
self._apply_layout()
@staticmethod
def __new__(cls, tensor, *args, **kwargs):
return torch.Tensor._make_subclass(cls, tensor, tensor.requires_grad)
def __new__(cls, local_tensor, layout):
return torch.Tensor._make_subclass(cls, local_tensor, local_tensor.requires_grad)
def __repr__(self):
return f"DTensor(\n{self.to_global()}\n{self.dist_layout}"
return f"DTensor({self.to_global()}, {self.dist_layout})"
def __str__(self):
return self.__repr__()
def layout_convert(self, device_mesh: DeviceMesh, sharding_spec: ShardingSpec) -> None:
def layout_convert(self, target_layout):
'''
Convert the layout of the tensor from source_spec to target_spec.
This will update the `local_tensor` and `dist_layout` in place.
Args:
target_layout (Layout): the target layout specification.
'''
target_layout = Layout(device_mesh=device_mesh, sharding_spec=sharding_spec, global_shape=self.global_shape)
self.local_tensor = layout_converter.apply(tensor=self.local_tensor,
source_layout=self.dist_layout,
target_layout=target_layout)
self.local_tensor = layout_converter.apply(self.local_tensor, self.dist_layout, target_layout)
self.dist_layout = target_layout
def _apply_layout(self):
'''
Apply the layout to the local tensor during initializing process.
'''
# layout converter requires a source and target laytout
# we construct the source layer for an unsharded tensor
# and use self.dist_layer as the targer layout for the sharded tensor
source_spec = construct_default_sharding_spec(self.local_tensor)
source_layout = Layout(device_mesh=self.dist_layout.device_mesh,
device_type=self.dist_layout.device_type,
sharding_spec=source_spec,
global_shape=self.global_shape)
self.local_tensor = layout_converter.apply(tensor=self.local_tensor,
source_layout=source_layout,
target_layout=self.dist_layout)
entire_shape=self.entire_shape)
self.local_tensor = layout_converter.apply(self.local_tensor, source_layout, self.dist_layout)
@classmethod
def __torch_function__(cls, func, types, args=(), kwargs=None):
if kwargs is None:
kwargs = {}
# convert all DTensors to native pytorch tensors
# so that operations will be conducted on native tensors
def filter_arg(arg):
if isinstance(arg, DTensor):
return arg.local_tensor
......@@ -124,9 +60,9 @@ class DTensor(torch.Tensor):
args = tree_map(filter_arg, args)
kwargs = tree_map(filter_arg, kwargs)
# NOTE: if we want to convert the result into DTensor, we need to infer the layout of result from the layout of input tensors
# if we want to convert the result into DTensor, we need to infer the layout of result from the layout of input tensors
# and op type.
return func(*args, **kwargs)
@property
......@@ -149,6 +85,7 @@ class DTensor(torch.Tensor):
'''
self.local_tensor = self.local_tensor.to(*args, **kwargs)
self.data_type = self.local_tensor.dtype
self.dist_layout.device_type = self.local_tensor.device
# TODO: update the device mesh process groups or we should just cache
# both the cpu process groups and the cuda process groups?
return self
......@@ -161,7 +98,7 @@ class DTensor(torch.Tensor):
def to_global(self):
'''
Recover the global tensor from the distributed tensor by returning a new `torch.Tensor` object.
Recover the global tensor from the distributed tensor.
Note: This function will all_gather the local tensor to the global tensor and it
will not change the layout of the DTensor. This function is mainly used for debugging or
......@@ -170,29 +107,24 @@ class DTensor(torch.Tensor):
return to_global(self.local_tensor, self.dist_layout)
def distribute_tensor(tensor: torch.Tensor, device_mesh: DeviceMesh, sharding_spec: ShardingSpec) -> DTensor:
def distribute_tensor(local_tensor: torch.Tensor, dist_layout: Layout) -> DTensor:
'''
Distribute the local tensor to the distributed tensor according to the dist_layout specified.
Args:
tensor (`torch.Tensor`): tensor to be distributed.
device_mesh (`DeviceMesh`): the device mesh for abstraction of the compute devices.
sharding_spec (`ShardingSpec`): the sharding specification which describes how the tensor will be sharded.
local_tensor: tensor to be distributed.
dist_layout: the layout specification of the distributed tensor.
Returns:
A 'DTensor' object.
'''
return DTensor(tensor, device_mesh, sharding_spec)
return DTensor(local_tensor, dist_layout)
def distribute_module(module: torch.nn.Module, partition_fn: Optional[callable] = None) -> torch.nn.Module:
'''
This function converts all the parameters in the module to DTensor(DParam).
Args:
module (`torch.nn.Module`): the module to be distributed.
partition_fn (callable): the partition function which will be used to partition the parameters.
Note: This function is subject to future change as the DParam has not been implemented yet.
'''
for name, param in module.named_parameters():
......@@ -206,11 +138,5 @@ def distribute_module(module: torch.nn.Module, partition_fn: Optional[callable]
def construct_default_sharding_spec(tensor: torch.Tensor,) -> ShardingSpec:
'''
Construct the default sharding specification for the tensor.
Args:
tensor (`torch.Tensor`): the tensor to be sharded.
Returns:
A `ShardingSpec` object without any sharding specified.
'''
return ShardingSpec(dim_size=tensor.dim(), dim_partition_dict={})
......@@ -11,32 +11,28 @@ from .sharding_spec import ShardingSpec
class Layout:
"""
Layout of a tensor refers to the tensor placement on the device mesh and how the tensor is sharded over the devices.
"""Layout of a tensor.
Args:
device_mesh (`DeviceMesh`): the device mesh to store the tensor distributed.
sharding_spec (`ShardingSpec`): the sharding specification to describe how the tensor is sharded.
global_shape (`torch.Size`): the entire shape of the global tensor.
Attributes:
device_mesh: the device mesh to store the tensor distributed.
device_type: the type of the device mesh, e.g. 'cpu' or 'cuda'.
sharding_spec: the sharding specification to describe how the tensor is sharded.
entire_shape: the entire shape of the global tensor.
"""
def __init__(self, device_mesh: DeviceMesh, sharding_spec: ShardingSpec, global_shape: torch.Size):
def __init__(self, device_mesh: DeviceMesh, device_type: torch.device, sharding_spec: ShardingSpec,
entire_shape: torch.Size):
self.device_mesh = device_mesh
self.device_type = device_type
self.sharding_spec = sharding_spec
self.global_shape = global_shape
self.entire_shape = entire_shape
self._sanity_check()
def __hash__(self) -> int:
return hash(f'{self.sharding_spec}')
def get_sharded_shape_per_device(self) -> torch.Size:
"""
Compute the shape of the sharded tensor on each device.
Returns:
`torch.Size`: the shape of the sharded tensor on each device.
"""
sharded_shape = list(self.global_shape)
def get_sharded_shape_per_device(self):
sharded_shape = list(self.entire_shape)
for dim, shard_list in self.sharding_spec.dim_partition_dict.items():
mesh_list = [self.device_mesh.mesh_shape[mesh_dim] for mesh_dim in shard_list]
shard_partitions = reduce(operator.mul, mesh_list, 1)
......@@ -60,7 +56,7 @@ class Layout:
# make sure that the sharding for a dimension is divisible by the number of devices
for dim, shard_list in sharding_spec.dim_partition_dict.items():
tensor_dim_size = self.global_shape[dim]
tensor_dim_size = self.entire_shape[dim]
num_devices = 1
for element in shard_list:
......
......@@ -3,8 +3,10 @@ from copy import deepcopy
from dataclasses import dataclass
from typing import Dict, List, Tuple
import numpy as np
import torch
from colossalai.auto_parallel.tensor_shard.sharding_strategy import MemoryCost, TrainCycleItem
from colossalai.context.singleton_meta import SingletonMeta
from colossalai.tensor.d_tensor.comm_spec import *
from colossalai.tensor.d_tensor.layout import Layout
......@@ -26,21 +28,13 @@ class LayoutConverterOptions:
pass
def to_global(distributed_tensor: "DTensor", layout: Layout) -> torch.Tensor:
"""
Convert a distributed tensor to the global tensor with the given layout.
This function returns a native `torch.Tensor` object.
Args:
distributed_tensor (`DTensor`): the distributed tensor to be converted.
layout (`Layout`): the target layout specification.
"""
def to_global(distributed_tensor: torch.Tensor, layout: Layout) -> torch.Tensor:
layout_converter = LayoutConverter()
global_sharding_spec = ShardingSpec(distributed_tensor.dim(), {})
global_layout = Layout(device_mesh=layout.device_mesh,
device_type=layout.device_type,
sharding_spec=global_sharding_spec,
global_shape=layout.global_shape)
entire_shape=layout.entire_shape)
with torch.no_grad():
global_tensor = layout_converter.apply(distributed_tensor, layout, global_layout)
return global_tensor
......@@ -55,9 +49,6 @@ def set_layout_converting_options(options: LayoutConverterOptions):
class LayoutConverter(metaclass=SingletonMeta):
"""
LayoutConverter is a singleton class which converts the layout of a distributed tensor.
"""
def __init__(self):
self._options = None
......@@ -100,14 +91,15 @@ class LayoutConverter(metaclass=SingletonMeta):
# [[0, 1,
# [2, 3]]
device_mesh = DeviceMesh(physical_mesh_id, mesh_shape, init_process_group=True)
global_shape = (4, 4, 4)
entire_shape = (4, 4, 4)
dim_partition_dict = {0: [0], 1: [1]}
# [S0,S1,R]
sharding_spec = ShardingSpec(dim_size=3, dim_partition_dict=dim_partition_dict)
layout = Layout(device_mesh=device_mesh,
device_type=torch.device('cuda'),
sharding_spec=sharding_spec,
global_shape=global_shape)
entire_shape=entire_shape)
rst_dict = layout_converter.all_gather_transform_layouts(layout)
for layout, comm_spec in rst_dict.items():
......@@ -120,12 +112,7 @@ class LayoutConverter(metaclass=SingletonMeta):
valid_spec_dict = {}
comm_pattern = CollectiveCommPattern.GATHER_FWD_SPLIT_BWD
source_spec = source_layout.sharding_spec
# the key of the dict is the axis
# the value is the process group
current_rank = source_layout.device_mesh._global_rank_of_current_process
process_group_dict = source_layout.device_mesh._process_group_dict[current_rank]
process_groups_dict = source_layout.device_mesh.process_groups_dict
for target_pair in source_spec.dim_partition_dict.items():
shard_list = all_gather_simulator(target_pair)
index = target_pair[0]
......@@ -143,7 +130,7 @@ class LayoutConverter(metaclass=SingletonMeta):
logical_process_axis = target_pair[1][-1]
comm_spec = CommSpec(
comm_pattern,
process_group_dict=process_group_dict,
process_groups_dict=process_groups_dict,
gather_dim=gather_dim,
# shard_dim will be used during backward
shard_dim=gather_dim,
......@@ -154,7 +141,8 @@ class LayoutConverter(metaclass=SingletonMeta):
new_sharding_spec = ShardingSpec(source_spec.dims, dim_partition_dict=new_dim_partition_dict)
new_layout = Layout(device_mesh=source_layout.device_mesh,
sharding_spec=new_sharding_spec,
global_shape=source_layout.global_shape)
device_type=source_layout.device_type,
entire_shape=source_layout.entire_shape)
valid_spec_dict[new_layout] = comm_spec
except LayoutException:
......@@ -179,14 +167,15 @@ class LayoutConverter(metaclass=SingletonMeta):
# [[0, 1,
# [2, 3]]
device_mesh = DeviceMesh(physical_mesh_id, mesh_shape, init_process_group=True)
global_shape = (4, 4, 4)
entire_shape = (4, 4, 4)
dim_partition_dict = {0: [0], 1: [1]}
# [S0,S1,R]
sharding_spec = ShardingSpec(dim_size=3, dim_partition_dict=dim_partition_dict)
layout = Layout(device_mesh=device_mesh,
device_type=torch.device('cuda'),
sharding_spec=sharding_spec,
global_shape=global_shape)
entire_shape=entire_shape)
rst_dict = layout_converter.all_to_all_transform_layout(layout)
for layout, comm_spec in rst_dict.items():
......@@ -199,12 +188,7 @@ class LayoutConverter(metaclass=SingletonMeta):
'''
valid_spec_dict = {}
comm_pattern = CollectiveCommPattern.ALL2ALL_FWD_ALL2ALL_BWD
# the key of the dict is the axis
# the value is the process group
current_rank = source_layout.device_mesh._global_rank_of_current_process
process_group_dict = source_layout.device_mesh._process_group_dict[current_rank]
process_groups_dict = source_layout.device_mesh.process_groups_dict
source_spec = source_layout.sharding_spec
tensor_dims = source_spec.dims
for f_index in range(tensor_dims - 1):
......@@ -245,7 +229,7 @@ class LayoutConverter(metaclass=SingletonMeta):
shard_dim = f_index
logical_process_axis = b_target_pair[1][-1]
comm_spec = CommSpec(comm_pattern,
process_group_dict=process_group_dict,
process_groups_dict,
gather_dim=gather_dim,
shard_dim=shard_dim,
logical_process_axis=logical_process_axis)
......@@ -268,7 +252,8 @@ class LayoutConverter(metaclass=SingletonMeta):
new_sharding_spec = ShardingSpec(source_spec.dims, dim_partition_dict=new_dim_partition_dict)
new_layout = Layout(device_mesh=source_layout.device_mesh,
sharding_spec=new_sharding_spec,
global_shape=source_layout.global_shape)
device_type=source_layout.device_type,
entire_shape=source_layout.entire_shape)
valid_spec_dict[new_layout] = comm_spec
except LayoutException:
pass
......@@ -293,15 +278,16 @@ class LayoutConverter(metaclass=SingletonMeta):
# [[0, 1,
# [2, 3]]
device_mesh = DeviceMesh(physical_mesh_id, mesh_shape, init_process_group=True)
global_shape = (4, 4, 4)
entire_shape = (4, 4, 4)
dim_partition_dict = {0: [0]}
# [S0,R,R]
sharding_spec = ShardingSpec(dim_size=3, dim_partition_dict=dim_partition_dict)
layout = Layout(device_mesh=device_mesh,
device_type=torch.device('cuda'),
sharding_spec=sharding_spec,
global_shape=global_shape)
entire_shape=entire_shape)
rst_dict = layout_converter.shard_transform_layout(layout)
for layout, comm_spec in rst_dict.items():
......@@ -315,11 +301,7 @@ class LayoutConverter(metaclass=SingletonMeta):
valid_spec_dict = {}
comm_pattern = CollectiveCommPattern.SPLIT_FWD_GATHER_BWD
source_spec = source_layout.sharding_spec
# the key of the dict is the axis
# the value is the process group
current_rank = source_layout.device_mesh._global_rank_of_current_process
process_group_dict = source_layout.device_mesh._process_group_dict[current_rank]
process_groups_dict = source_layout.device_mesh.process_groups_dict
# legal sharding dims means the mesh_id is still available to use.
legal_sharding_dims = [i for i in range(len(source_layout.device_mesh.mesh_shape))]
......@@ -347,7 +329,7 @@ class LayoutConverter(metaclass=SingletonMeta):
shard_dim = index
logical_process_axis = shard_list[-1]
comm_spec = CommSpec(comm_pattern,
process_group_dict=process_group_dict,
process_groups_dict,
gather_dim=shard_dim,
shard_dim=shard_dim,
logical_process_axis=logical_process_axis)
......@@ -358,7 +340,8 @@ class LayoutConverter(metaclass=SingletonMeta):
dim_partition_dict=new_dim_partition_dict)
new_layout = Layout(device_mesh=source_layout.device_mesh,
sharding_spec=new_sharding_spec,
global_shape=source_layout.global_shape)
device_type=source_layout.device_type,
entire_shape=source_layout.entire_shape)
valid_spec_dict[new_layout] = comm_spec
except LayoutException:
pass
......@@ -416,7 +399,7 @@ class LayoutConverter(metaclass=SingletonMeta):
# [[0, 1,
# [2, 3]]
device_mesh = DeviceMesh(physical_mesh_id, mesh_shape, init_process_group=True)
global_shape = (4, 4, 4)
entire_shape = (4, 4, 4)
dim_partition_source = {1: [0, 1]}
dim_partition_target = {0: [0, 1]}
......@@ -424,14 +407,16 @@ class LayoutConverter(metaclass=SingletonMeta):
# [R,S01,R]
sharding_spec_source = ShardingSpec(dim_size=3, dim_partition_dict=dim_partition_source)
source_layout = Layout(device_mesh=device_mesh,
device_type=torch.device('cuda'),
sharding_spec=sharding_spec_source,
global_shape=global_shape)
entire_shape=entire_shape)
# [S01,R,R]
sharding_spec_target = ShardingSpec(dim_size=3, dim_partition_dict=dim_partition_target)
target_layout = Layout(device_mesh=device_mesh,
device_type=torch.device('cuda'),
sharding_spec=sharding_spec_target,
global_shape=global_shape)
entire_shape=entire_shape)
transform_path, comm_action_sequence = layout_converter.layout_converting(source_layout, target_layout)
transform_path_str = '->'.join([str(layout.sharding_spec.sharding_sequence) for layout in transform_path])
......@@ -520,19 +505,21 @@ class LayoutConverter(metaclass=SingletonMeta):
# [[0, 1,
# [2, 3]]
device_mesh = DeviceMesh(physical_mesh_id, mesh_shape, init_process_group=True)
global_shape = (4, 4, 4)
entire_shape = (4, 4, 4)
# [S0,R,R]
sharding_spec_source = ShardingSpec(dim_size=3, dim_partition_dict=dim_partition_source)
source_layout = Layout(device_mesh=device_mesh,
device_type=torch.device('cuda'),
sharding_spec=sharding_spec_source,
global_shape=global_shape)
entire_shape=entire_shape)
# [R,S0,R]
sharding_spec_target = ShardingSpec(dim_size=3, dim_partition_dict=dim_partition_target)
target_layout = Layout(device_mesh=device_mesh,
device_type=torch.device('cuda'),
sharding_spec=sharding_spec_target,
global_shape=global_shape)
entire_shape=entire_shape)
if rank in (0, 1):
sharded_tensor_0 = torch.zeros(2, 1)
......@@ -567,4 +554,3 @@ class LayoutConverter(metaclass=SingletonMeta):
for comm_spec in comm_action_sequence:
tensor = comm_spec.covert_spec_to_action(tensor)
return tensor
return tensor
......@@ -116,21 +116,21 @@ class DimSpec:
def dim_diff(self, other):
'''
The difference between two DimSpec.
The difference between two _DimSpec.
Argument:
other(DimSpec): the dim spec to compare with.
other(_DimSpec): the dim spec to compare with.
Return:
difference(int): the difference between two _DimSpec.
Example:
```python
dim_spec = DimSpec([0])
other_dim_spec = DimSpec([0, 1])
dim_spec = _DimSpec([0])
other_dim_spec = _DimSpec([0, 1])
print(dim_spec.difference(other_dim_spec))
# output: 5
```
Output:
5
'''
difference = self.difference_dict[(str(self), str(other))]
return difference
......@@ -142,13 +142,9 @@ class ShardingSpec:
[R, R, S0, S1], which means
Argument:
dim_size (int): The number of dimensions of the tensor to be sharded.
dim_partition_dict (Dict[int, List[int]], optional): The key is the dimension of tensor to be sharded,
and the value of the key describe which logical axis will be sharded in that dimension. Defaults to None.
E.g. {0: [0, 1]} means the first dimension of the tensor will be sharded in logical axis 0 and 1.
sharding_sequence (List[DimSpec], optional): A straight view of ShardingSpec looks like [R, R, S0, S1].
Generally, users should specify either dim_partition_dict or sharding_sequence.
If both are given, users must ensure that they are consistent with each other. Defaults to None.
dim_partition_dict(Dict[int, List[int]], optional): The key is the dimension of tensor to be sharded,
and the value of the key describe which logical axis will be sharded in that dimension.
sharding_sequence(List[DimSpec], optional): A straight view of ShardingSpec looks like [R, R, S0, S1].
'''
def __init__(self,
......@@ -212,7 +208,6 @@ class ShardingSpec:
pair of sharding sequence.
Example:
```python
dim_partition_dict = {0: [0, 1]}
# DistSpec:
# shard_sequence: S01,R,R
......@@ -224,8 +219,10 @@ class ShardingSpec:
# device_mesh_shape: (4, 4)
sharding_spec_to_compare = ShardingSpec(device_mesh, entire_shape, dim_partition_dict_to_compare)
print(sharding_spec.sharding_sequence_difference(sharding_spec_to_compare))
# output: 25
```
Output:
25
Argument:
other(ShardingSpec): The ShardingSpec to compared with.
......
......@@ -64,7 +64,6 @@
},
"features/pipeline_parallel",
"features/nvme_offload",
"features/lazy_init",
"features/cluster_utils"
]
},
......
# Lazy initialization
Author: Hongxin Liu
**Prerequisite**
- [Booster API](../basics/booster_api.md)
- [Booster Plugins](../basics/booster_plugins.md)
- [Booster Checkpoint](../basics/booster_checkpoint.md)
**Related discussion**
- [Lazy initialization of model](https://github.com/hpcaitech/ColossalAI/discussions/3124)
## Introduction
LazyTensor allows DL framework (PyTorch) to execute operations lazily, by storing all operations related to it and reruning them when it's required to be materialized.
LazyInit defers model initialization and it's based on LazyTensor.
This is especially useful when we use model parallelism to train large models, in which case the model cannot fit in GPU memory. Through this, we can initialize model tensors using meta tensor and do static analysis to get shard strategy. And then materialize each tensor and apply the shard strategy. The static analysis can be omitted if the shard strategy is known in advance.
## Usage
You may use lazy initialization when using Gemini, tensor parallelism, pipeline parallelism, and auto-parallelism. In other cases, you may not need to use lazy initialization.
Gemini is compatible with lazy initialization. You can use them together directly.
```python
from colossalai.booster import Booster
from colossalai.booster.plugin import GeminiPlugin
from colossalai.lazy import LazyInitContext
from colossalai.nn.optimizer import HybridAdam
from torch.nn import Linear
import colossalai
colossalai.launch_from_torch({})
plugin = GeminiPlugin()
booster = Booster(plugin=plugin)
with LazyInitContext():
model = Linear(10, 10)
optimizer = HybridAdam(model.parameters())
model, optimizer, *_ = booster.boost(model, optimizer)
```
Note that using lazy initialization when using Gemini is not necessary but recommended. If you don't use lazy initialization, you may get OOM error when initializing the model. If you use lazy initialization, you can avoid this error.
> ⚠ Lazy initialization support for tensor parallelism, pipeline parallelism, and auto-parallelism is still under development.
### Load from pretrained model
We should not load pretrained weight in `LazyInitContext`. If so, lazy initialization is meaningless, as the checkpoint is loaded and it takes much GPU memory. A recommended way is to initialize model from scratch in `LazyInitContext` and load pretrained weight outside `LazyInitContext` after calling `Booster.boost()`.
<!--- doc-test-ignore-start -->
```python
with LazyInitContext():
model = GPT2LMHeadModel(config)
optimizer = ...
lr_scheduler = ...
dataloader = ...
model, optimizer, lr_scheduler, dataloader = booster.boost(model, optimizer, lr_scheduler, dataloader)
booster.load_model(model, pretrained_path)
```
<!--- doc-test-ignore-end -->
As booster supports both pytorch-fashion checkpoint and huggingface/transformers-fashion pretrained weight, the `pretrained_path` of the above pseudo-code can be either a checkpoint file path or a pretrained weight path. Note that it does not support loading pretrained weights from network. You should download the pretrained weight first and then use a local path.
<!-- doc-test-command: torchrun --standalone --nproc_per_node=1 lazy_init.py -->
# 惰性初始化
作者: Hongxin Liu
**前置教程**
- [Booster API](../basics/booster_api.md)
- [Booster 插件](../basics/booster_plugins.md)
- [Booster Checkpoint](../basics/booster_checkpoint.md)
**相关讨论**
- [模型的惰性初始化](https://github.com/hpcaitech/ColossalAI/discussions/3124)
## 引言
LazyTensor 允许深度学习框架 (PyTorch) 延迟执行操作,方法是存储与其相关的所有操作并在需要具体化时重新运行它们。
LazyInit 基于 LazyTensor,并支持延迟模型初始化。
这在我们使用模型并行来训练大型模型时特别有用,在这种情况下模型无法容纳在 GPU 内存中。通过这个,我们可以使用 Meta 张量初始化模型张量并进行静态分析以获得分片策略。然后具体化每个张量并应用分片策略。如果事先知道分片策略,则可以省略静态分析。
## 用法
您可以在使用 Gemini、张量并行、流水线并行和自动并行时使用惰性初始化。在其他情况下,您可能不需要使用惰性初始化。
Gemini 与惰性初始化兼容。您可以直接将它们一起使用。
```python
from colossalai.booster import Booster
from colossalai.booster.plugin import GeminiPlugin
from colossalai.lazy import LazyInitContext
from colossalai.nn.optimizer import HybridAdam
from torch.nn import Linear
import colossalai
colossalai.launch_from_torch({})
plugin = GeminiPlugin()
booster = Booster(plugin=plugin)
with LazyInitContext():
model = Linear(10, 10)
optimizer = HybridAdam(model.parameters())
model, optimizer, *_ = booster.boost(model, optimizer)
```
请注意,在使用 Gemini 时使用惰性初始化不是必需的,但建议使用。如果不使用惰性初始化,在初始化模型时可能会出现 OOM 错误。如果使用惰性初始化,则可以避免此错误。
> ⚠ 对张量并行、流水线并行和自动并行的惰性初始化支持仍在开发中。
### 从预训练模型加载
我们不应该在 `LazyInitContext` 中加载预训练权重。如果这样,惰性初始化就没有意义,因为检查点已加载并且需要大量 GPU 内存。推荐的方法是在 `LazyInitContext` 中初始化模型,并在调用 `Booster.boost()` 后在 `LazyInitContext` 之外加载预训练权重。
<!--- doc-test-ignore-start -->
```python
with LazyInitContext():
model = GPT2LMHeadModel(config)
optimizer = ...
lr_scheduler = ...
dataloader = ...
model, optimizer, lr_scheduler, dataloader = booster.boost(model, optimizer, lr_scheduler, dataloader)
booster.load_model(model, pretrained_path)
```
<!--- doc-test-ignore-end -->
由于 booster 同时支持 pytorch 风格的 checkpoint 和 huggingface/transformers 风格的预训练权重,上述伪代码的 `pretrained_pa​​th` 可以是 checkpoint 文件路径或预训练权重路径。请注意,它不支持从网络加载预训练权重。您应该先下载预训练的权重,然后使用本地路径。
<!-- doc-test-command: torchrun --standalone --nproc_per_node=1 lazy_init.py -->
import torch
from colossalai.device.device_mesh import DeviceMesh
import torch
def test_device_mesh():
physical_mesh_id = torch.arange(0, 16)
physical_mesh_id = torch.arange(0, 16).reshape(2, 8)
mesh_shape = (4, 4)
# [[0, 1, 2, 3],
# [4, 5, 6, 7],
# [8, 9, 10,11],
# [12,13,14,15]]
device_mesh = DeviceMesh(physical_mesh_id, mesh_shape)
assert device_mesh.global_rank_to_local_rank(5) == [1, 1]
assert device_mesh.global_rank_to_local_rank(11) == [2, 3]
assert device_mesh.get_ranks_in_process_group(axis=1, global_rank=2) == [0, 1, 2, 3]
assert device_mesh.convert_map[5] == [1, 1]
assert device_mesh.convert_map[11] == [2, 3]
assert device_mesh.global_rank_to_process_groups_with_logical_rank(0)[0] == [[0, 0], [1, 0], [2, 0], [3, 0]]
assert device_mesh.global_rank_to_process_groups_with_logical_rank(2)[1] == [[0, 0], [0, 1], [0, 2], [0, 3]]
assert device_mesh.global_rank_to_process_groups_with_global_rank(2)[1] == [0, 1, 2, 3]
if __name__ == '__main__':
......
......@@ -20,12 +20,16 @@ def check_layer(rank, world_size, port):
# [[0, 1,
# [2, 3]]
device_mesh = DeviceMesh(physical_mesh_id, mesh_shape, init_process_group=True)
for axis in range(len(mesh_shape)):
tensor = torch.ones(4).cuda()
pg = device_mesh.get_process_group(axis=axis)
dist.all_reduce(tensor, op=ReduceOp.SUM, group=pg)
assert tensor.equal(tensor_to_check)
logical_pg_dict = {0: [[0, 2], [1, 3]], 1: [[0, 1], [2, 3]]}
logical_process_groups = device_mesh.process_groups_dict
for mesh_dim, pgs in logical_pg_dict.items():
for index, pg in enumerate(pgs):
if rank in pg:
tensor = torch.ones(4).cuda()
group = logical_process_groups[mesh_dim][index][1]
dist.all_reduce(tensor, op=ReduceOp.SUM, group=group)
assert tensor.equal(tensor_to_check)
gpc.destroy()
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment