Commit d99506f3 authored by chenzk's avatar chenzk
Browse files

v1.0.1

parent 61e92904
Pipeline #2033 canceled with stages
import hashlib
import importlib
import json
import os
import sys
from argparse import Namespace
from collections import OrderedDict
from pathlib import Path
package = importlib.import_module("nanotron")
package_path = Path(package.__file__).parent.parent.parent
sys.path.append(str(package_path))
import nanotron.distributed as dist
import torch
from nanotron.data.nanoset import Nanoset
from nanotron.parallel import ParallelContext
from nanotron.parallel.pipeline_parallel.tensor_pointer import TensorPointer
from nanotron.sanity_checks import assert_tensor_synced_across_pg
from tools.preprocess_data import main
def create_dataset_paths(tmp_dir: str, quantity: int):
json_dataset_path = [os.path.join(tmp_dir, f"pytest_{i}.json") for i in range(quantity)]
datatrove_tokenized_dataset_paths = [os.path.join(tmp_dir, f"tokenized_documents_{i}") for i in range(quantity)]
return json_dataset_path, datatrove_tokenized_dataset_paths
def create_dummy_json_dataset(path_to_json: str, dummy_text: str, n_samples: int = 50000):
with open(path_to_json, "a") as json_file:
for sample in range(n_samples):
sample_dict = {"text": f"[{sample}] Hello! Im sample {sample}! And this is my dummy text: {dummy_text}"}
json_file.write(json.dumps(sample_dict))
json_file.write("\n")
def preprocess_dummy_dataset(json_dataset_path: str, datatrove_tokenized_dataset_path: str, tokenizer: str):
# Create args for preprocessing
args = Namespace(
readers="jsonl",
dataset=json_dataset_path,
column="text",
glob_pattern=None,
output_folder=datatrove_tokenized_dataset_path,
tokenizer_name_or_path=tokenizer,
eos_token=None,
n_tasks=1,
logging_dir=None,
)
# tools/preprocess_data.py main
main(args)
def assert_batch_dataloader(
batch: dict, parallel_context: ParallelContext, micro_batch_size: int, sequence_length: int
):
"""
batch (dict): Batch produced from the Dataloader, with keys input_ids, input_mask, label_ids, label_mask
"""
for element in batch:
tensor = batch[element]
# Assert that inputs are only present in input_pp_rank and outputs in output_pp_rank
input_pp_rank, output_pp_rank = 0, int(parallel_context.pp_pg.size() - 1)
if dist.get_rank(parallel_context.pp_pg) == input_pp_rank and element.startswith("input_"):
assert isinstance(tensor, torch.Tensor)
elif dist.get_rank(parallel_context.pp_pg) == output_pp_rank and element.startswith("label_"):
assert isinstance(tensor, torch.Tensor)
else:
assert isinstance(tensor, TensorPointer)
data_class = (
0 # 0 if tensor is from the ids, 1 if TensorPointer and 2 if mask. Used in the data parallel group check
)
# Check shape of mask and ids tensors
if isinstance(tensor, torch.Tensor):
assert tensor.shape == (micro_batch_size, sequence_length)
# TensorPointer case: Check that all TensorPointers from the same tp_pg point to the same group_rank. Create torch.tensor with group_rank
if isinstance(tensor, TensorPointer):
tensor = torch.tensor(tensor.group_rank)
data_class = 1
# Attention Masks case: dtype is torch.bool --> Transform to int64
if tensor.dtype == torch.bool:
tensor = tensor.long()
data_class = 2
# Assert that we have the SAME element in all the processes belonging to the same tensor parallel group
assert_tensor_synced_across_pg(
tensor=tensor.flatten().cuda(),
pg=parallel_context.tp_pg,
msg=lambda err: f"{element} is not synchronized across TP {err}",
)
# Assert that we have the SAME class of data in all processes belonging to the same data parallel group
assert_tensor_synced_across_pg(
tensor=torch.tensor(data_class, device="cuda"),
pg=parallel_context.dp_pg,
msg=lambda err: f"{element} is not synchronized across DP {err}",
)
def compute_hash(identifier: OrderedDict, n_digit: int = 8) -> int:
"""
Creates a sha256 hash from the elements of a OrderedDict
"""
unique_description = json.dumps(identifier, indent=4)
# Create n_digit description hash
unique_description_hash = int(hashlib.sha256(unique_description.encode("utf-8")).hexdigest(), 16) % 10**n_digit
return unique_description_hash
def assert_nanoset_sync_across_all_ranks(nanoset: Nanoset, parallel_context: ParallelContext):
"""
Checks that the same Nanoset is created in all processes
"""
# Extract a sample from the Nanoset
IDX_SAMPLE = 23
nanoset_identifiers = OrderedDict()
nanoset_identifiers["dataset_folders"] = nanoset.dataset_folders
nanoset_identifiers["dataset_weights"] = nanoset.dataset_weights.tolist()
nanoset_identifiers["sequence_length"] = nanoset.sequence_length
nanoset_identifiers["train_split_num_samples"] = nanoset.train_split_num_samples
nanoset_identifiers["random_seed"] = nanoset.random_seed
nanoset_identifiers["length"] = len(nanoset)
nanoset_identifiers["input_ids"] = nanoset[IDX_SAMPLE]["input_ids"].tolist()
nanoset_identifiers["dataset_index"] = nanoset.dataset_index.tolist()
nanoset_identifiers["dataset_sample_index"] = nanoset.dataset_sample_index.tolist()
nanoset_identifiers["token_size"] = nanoset.token_size
unique_description_hash = compute_hash(nanoset_identifiers)
assert_tensor_synced_across_pg(
tensor=torch.tensor(unique_description_hash, device="cuda"),
pg=parallel_context.world_pg,
msg=lambda err: f"Nanoset is not synchronized across all processes {err}",
)
def compute_batch_hash(batch: dict) -> int:
"""
Checks that the Nanoset/BlendedNanoset is in the same state after recovering from a crash
batch (dict): Batch produced from the Dataloader, with keys input_ids, input_mask, label_ids, label_mask
"""
batch_identifiers = OrderedDict()
for element in batch:
tensor = batch[element]
# TensorPointer
if isinstance(tensor, TensorPointer):
identifier = tensor.group_rank
# Attention Masks case: dtype is torch.bool --> Transform to int64
elif tensor.dtype == torch.bool:
identifier = tensor.long().tolist()
# Input IDs tensor
else:
identifier = tensor.tolist()
batch_identifiers[element] = identifier
unique_description_hash = compute_hash(batch_identifiers)
return unique_description_hash
import torch
from nanotron import distributed as dist
from nanotron.distributed import ProcessGroup, get_global_rank
def assert_tensor_equal_over_group(tensor: torch.Tensor, group: ProcessGroup, assert_: bool = True) -> bool:
"""We assume that tensors are already of correct size."""
reference_rank = 0
if dist.get_rank(group) == reference_rank:
reference_tensor = tensor
else:
reference_tensor = torch.empty_like(tensor)
dist.broadcast(
reference_tensor,
src=get_global_rank(group=group, group_rank=reference_rank),
group=group,
)
if assert_:
torch.testing.assert_close(tensor, reference_tensor, atol=0, rtol=0)
else:
result = torch.allclose(tensor, reference_tensor, atol=0.0, rtol=0.0)
results = [0] * group.size()
dist.all_gather_object(results, result, group)
return all(results)
from math import ceil
from typing import Union
import torch
from nanotron import distributed as dist
from nanotron.models import init_on_device_and_dtype
from nanotron.optim.base import BaseOptimizer
from nanotron.optim.named_optimizer import NamedOptimizer
from nanotron.parallel import ParallelContext
from nanotron.parallel.parameters import NanotronParameter
from nanotron.parallel.pipeline_parallel.block import PipelineBlock
from nanotron.parallel.pipeline_parallel.p2p import P2P
from nanotron.parallel.pipeline_parallel.tensor_pointer import TensorPointer
from nanotron.parallel.tied_parameters import tie_parameters
from nanotron.parallel.utils import initial_sync
from torch import nn
from torch.nn.parallel import DistributedDataParallel
class DummyModel(nn.Module):
def __init__(
self,
p2p: P2P,
):
super().__init__()
self.p2p = p2p
self.mlp = nn.Sequential(
*(
nn.ModuleDict(
{
"linear": PipelineBlock(
p2p=p2p,
module_builder=nn.Linear,
module_kwargs={"in_features": 10, "out_features": 10},
module_input_keys={"input"},
module_output_keys={"output"},
),
"activation": PipelineBlock(
p2p=p2p,
module_builder=nn.Sigmoid if pp_rank < p2p.pg.size() - 1 else nn.Identity,
module_kwargs={},
module_input_keys={"input"},
module_output_keys={"output"},
),
}
)
for pp_rank in range(p2p.pg.size())
)
)
self.loss = PipelineBlock(
p2p=p2p,
module_builder=lambda: lambda x: x.sum(),
module_kwargs={},
module_input_keys={"x"},
module_output_keys={"output"},
)
def forward(self, x: Union[torch.Tensor, TensorPointer]):
for non_linear in self.mlp:
x = non_linear.linear(input=x)["output"]
x = non_linear.activation(input=x)["output"]
x = self.loss(x=x)["output"]
return x
def init_dummy_model(parallel_context: ParallelContext, dtype: torch.dtype = torch.float) -> DummyModel:
p2p = P2P(pg=parallel_context.pp_pg, device=torch.device("cuda"))
model = DummyModel(p2p=p2p)
# Build model using contiguous segments
pipeline_blocks = [module for name, module in model.named_modules() if isinstance(module, PipelineBlock)]
with init_on_device_and_dtype(device=torch.device("cuda"), dtype=dtype):
contiguous_size = ceil(len(pipeline_blocks) / parallel_context.pp_pg.size())
for i, block in enumerate(pipeline_blocks):
rank = i // contiguous_size
block.build_and_set_rank(rank)
# Sync all parameters that have the same name and that are not sharded across TP.
for name, param in model.named_parameters():
if isinstance(param, NanotronParameter) and param.is_sharded:
continue
shared_weights = [
(
name,
# sync across TP group
tuple(sorted(dist.get_process_group_ranks(parallel_context.tp_pg))),
)
]
tie_parameters(
root_module=model, ties=shared_weights, parallel_context=parallel_context, reduce_op=dist.ReduceOp.SUM
)
initial_sync(model=model, parallel_context=parallel_context)
if len(list(model.named_parameters())) > 0:
model = DistributedDataParallel(model, process_group=parallel_context.dp_pg)
else:
# No parameters, so no need to use DDP to sync parameters gradients
model = model
return model
def init_dummy_optimizer(model: nn.Module, parallel_context: ParallelContext) -> BaseOptimizer:
optimizer = NamedOptimizer(
named_params_or_groups=model.named_parameters(), optimizer_builder=lambda params: torch.optim.AdamW(params)
)
# Synchronize across dp: basic assumption, already done as nothing in optimizer initialization is stochastic
return optimizer
def dummy_infinite_data_loader(pp_pg: dist.ProcessGroup, dtype=torch.float, input_pp_rank=0):
micro_batch_size = 3
# We assume the first linear is always built on the first rank.
current_pp_rank = dist.get_rank(pp_pg)
while True:
yield {
"x": torch.randn(micro_batch_size, 10, dtype=dtype, device="cuda")
if current_pp_rank == input_pp_rank
else TensorPointer(group_rank=input_pp_rank)
}
import contextlib
import signal
from typing import Optional
from nanotron import distributed as dist
@contextlib.contextmanager
def assert_fail_with(exception_class, error_msg: Optional[str] = None):
try:
yield
except exception_class as e:
if error_msg is None:
return
if error_msg == str(e):
return
else:
raise AssertionError(f'Expected message to be "{error_msg}", but got "{str(e)}" instead.')
except Exception as e:
raise AssertionError(f"Expected {exception_class} to be raised, but got: {type(e)} instead:\n{e}")
raise AssertionError(f"Expected {exception_class} to be raised, but no exception was raised.")
@contextlib.contextmanager
def assert_fail_except_rank_with(
exception_class, rank_exception: int, pg: dist.ProcessGroup, error_msg: Optional[str] = None
):
try:
yield
except exception_class as e:
if rank_exception == dist.get_rank(pg):
raise AssertionError(f"Expected rank {rank_exception} to not raise {exception_class}.")
else:
if error_msg is None:
return
if error_msg == str(e):
return
else:
raise AssertionError(f'Expected message to be "{error_msg}", but got "{str(e)}" instead.')
except Exception as e:
raise AssertionError(f"Expected {exception_class} to be raised, but got: {type(e)} instead:\n{e}")
if dist.get_rank(pg) != rank_exception:
raise AssertionError(f"Expected {exception_class} to be raised, but no exception was raised.")
@contextlib.contextmanager
def timeout_after(ms=500):
"""Timeout context manager."""
def signal_handler(signum, frame):
raise TimeoutError(f"Timed out after {ms} ms.")
signal.signal(signal.SIGALRM, signal_handler)
signal.setitimer(signal.ITIMER_REAL, ms / 1000)
try:
yield
finally:
signal.alarm(0)
import torch
from nanotron.config import (
AllForwardAllBackwardPipelineEngine,
CheckpointsArgs,
Config,
DataArgs,
DatasetStageArgs,
GeneralArgs,
LlamaConfig,
LoggingArgs,
LRSchedulerArgs,
ModelArgs,
OptimizerArgs,
ParallelismArgs,
TensorParallelLinearMode,
TokenizerArgs,
TokensArgs,
)
from nanotron.config.config import PretrainDatasetsArgs
from nanotron.models import build_model
from nanotron.models.llama import LlamaForTraining
from nanotron.parallel.context import ParallelContext
from nanotron.trainer import mark_tied_parameters
TINY_LLAMA_CONFIG = LlamaConfig(
**{
"bos_token_id": 1,
"eos_token_id": 2,
"hidden_act": "silu",
"hidden_size": 16,
"initializer_range": 0.02,
"intermediate_size": 32,
"is_llama_config": True,
"max_position_embeddings": 128,
"num_attention_heads": 8,
"num_hidden_layers": 4,
"num_key_value_heads": 4,
"pad_token_id": None,
"pretraining_tp": 1,
"rms_norm_eps": 1e-06,
"rope_scaling": None,
"tie_word_embeddings": False,
"use_cache": True,
"vocab_size": 4096,
}
)
def get_llama_training_config(model_config: ModelArgs):
return Config(
model=model_config,
general=GeneralArgs(project="unittest", run="sanity_llama", seed=42),
checkpoints=CheckpointsArgs(
checkpoints_path="./checkpoints",
checkpoint_interval=10,
),
parallelism=ParallelismArgs(
dp=1,
pp=1,
tp=2,
expert_parallel_size=2,
pp_engine="1f1b",
tp_mode="ALL_REDUCE",
tp_linear_async_communication=False,
),
tokenizer=TokenizerArgs("gpt2"),
optimizer=OptimizerArgs(
zero_stage=0,
weight_decay=0.01,
clip_grad=1.0,
accumulate_grad_in_fp32=False,
adam_eps=1e-08,
adam_beta1=0.9,
adam_beta2=0.95,
torch_adam_is_fused=True,
learning_rate_scheduler=LRSchedulerArgs(
learning_rate=3e-4,
lr_warmup_steps=100,
lr_warmup_style="linear",
lr_decay_style="cosine",
min_decay_lr=1e-5,
),
),
logging=LoggingArgs(),
tokens=TokensArgs(sequence_length=16, train_steps=10, micro_batch_size=16, batch_accumulation_per_replica=1),
data_stages=[
DatasetStageArgs(
name="train",
start_training_step=1,
data=DataArgs(
seed=42,
num_loading_workers=1,
dataset=PretrainDatasetsArgs(
hf_dataset_or_datasets="HuggingFaceH4/testing_alpaca_small",
hf_dataset_splits="train",
text_column_name="completion",
dataset_processing_num_proc_per_process=12,
),
),
)
],
)
def create_llama_from_config(
model_config: LlamaConfig, device: torch.device, parallel_context: ParallelContext
) -> LlamaForTraining:
"""
Creates and returns a nanotron model.
If `model_config` is None, then `checkpoint_path` must be set, in which case
the configuration will be loaded from such path.
If `checkpoint_path` is None, then `model_config` must be set, in which case
the model created will have random weights.
"""
parallel_config = ParallelismArgs(
dp=parallel_context.data_parallel_size,
pp=parallel_context.pipeline_parallel_size,
tp=parallel_context.tensor_parallel_size,
pp_engine=AllForwardAllBackwardPipelineEngine(),
tp_mode=TensorParallelLinearMode.ALL_REDUCE,
tp_linear_async_communication=False,
)
model = build_model(
model_builder=lambda: LlamaForTraining(
config=model_config,
parallel_context=parallel_context,
parallel_config=parallel_config,
random_states=None,
),
parallel_context=parallel_context,
dtype=torch.bfloat16,
device=device,
)
mark_tied_parameters(model=model, parallel_context=parallel_context)
return model
import contextlib
import os
import re
from inspect import signature
from typing import Any, Callable, Dict, List, Optional, Tuple
import torch.cuda
import torch.multiprocessing as mp
from nanotron.parallel import ParallelContext
from packaging import version
def available_gpus():
if not torch.cuda.is_available():
return 0
device_properties = [torch.cuda.get_device_properties(i) for i in range(torch.cuda.device_count())]
# We filter out
blacklisted_gpu_names = {"NVIDIA DGX Display"}
device_properties = [property_ for property_ in device_properties if property_.name not in blacklisted_gpu_names]
# TODO @thomasw21: Can we do this cross node
return len(device_properties)
# from https://stackoverflow.com/a/34333710/9201239
@contextlib.contextmanager
def mock_os_environ(remove_keys: List[str] = None, update_key_values: Dict[str, Any] = None):
"""
Temporarily updates the ``os.environ`` dictionary in-place.
The ``os.environ`` dictionary is updated in-place so that the modification is sure to work in all situations.
Args:
remove_keys: Environment variables to remove.
update_key_values: Dictionary of environment variables and values to add/update.
"""
env = os.environ
update_key_values = update_key_values or {}
remove_keys = remove_keys or []
update_keys = set(update_key_values.keys())
remove_keys = set(remove_keys)
assert remove_keys.isdisjoint(update_keys)
stomped = (update_keys | remove_keys) & set(env.keys())
reverse_change = {
# Environment variables and values to restore on exit.
**{k: env[k] for k in update_keys & stomped},
# Environment variables and values to remove on exit.
**{k: env[k] for k in remove_keys & stomped},
}
try:
env.update(update_key_values)
for k in remove_keys:
env.pop(k, None)
yield
finally:
env.update(reverse_change)
def is_dict_equal(first: Dict, second: Dict, sub_paths: Optional[List[str]] = None) -> Tuple[bool, Optional[str]]:
"""Returns True or False if the dictionaries match, and an additional message when it's False"""
if sub_paths is None:
sub_paths = []
first_keys = set(first.keys())
second_keys = set(second.keys())
if first_keys != second_keys:
return False, f"Keys don't match in {'.'.join(sub_paths)}.\nCur: {first_keys}\nRef: {second_keys}"
for key in first_keys:
first_elt = first[key]
second_elt = second[key]
if isinstance(first_elt, dict):
if not isinstance(second_elt, dict):
return (
False,
f"Object types don't match in {'.'.join(sub_paths + [str(key)])}.\nCur: {first_elt}\nRef: {second_elt}",
)
match, msg = is_dict_equal(first_elt, second_elt, sub_paths=sub_paths + [str(key)])
if match is False:
return False, msg
elif isinstance(first_elt, torch.Tensor):
if not isinstance(second_elt, torch.Tensor):
return (
False,
f"Object types don't match in {'.'.join(sub_paths + [str(key)])}.\nCur: {first_elt}\nRef: {second_elt}",
)
try:
torch.testing.assert_close(
first_elt,
second_elt,
atol=0.0,
rtol=0.0,
msg=lambda msg: f"Tensor at {'.'.join(sub_paths + [str(key)])} don't match.\nCur: {first_elt}\nRef: {second_elt}\n{msg}",
)
except AssertionError as error:
return False, error.args[0]
else:
if first_elt != second_elt:
return (
False,
f"Objects at key {'.'.join(sub_paths + [str(key)])} don't match.\nCur: {first_elt}\nRef: {second_elt}",
)
return True, None
def get_all_3d_configurations(gpus: int) -> List[Tuple[int, int, int]]:
"""Given a number of gpus, we want all 3d configurations possible such that pp * dp * tp = gpus"""
result = []
for tp in range(1, gpus + 1):
if gpus % tp != 0:
continue
gpus_left_after_tp = gpus // tp
for dp in range(1, gpus_left_after_tp + 1):
if gpus_left_after_tp % dp != 0:
continue
gpus_left_after_dp = gpus_left_after_tp // dp
for pp in range(1, gpus_left_after_dp + 1):
if gpus_left_after_dp % pp != 0:
continue
if tp * dp * pp == gpus:
result.append((pp, dp, tp))
return result
def rerun_if_address_is_in_use(max_try: int = 500):
"""
This function reruns a wrapped function if "address already in use" occurs
in testing spawned with torch.multiprocessing
Credits: https://github.com/hpcaitech/ColossalAI/blob/adae123df3badfb15d044bd416f0cf29f250bc86/colossalai/testing/utils.py#L157
Usage::
@rerun_if_address_is_in_use()
def test_something():
...
"""
# check version
torch_version = version.parse(torch.__version__)
assert torch_version.major >= 1
# only torch >= 1.8 has ProcessRaisedException
if torch_version >= version.parse("1.8.0"):
exception = torch.multiprocessing.ProcessRaisedException
else:
exception = Exception
func_wrapper = rerun_on_exception(exception_type=exception, pattern=".*Address already in use.*", max_try=max_try)
return func_wrapper
def rerun_on_exception(exception_type: Exception = Exception, pattern: str = None, max_try: int = 10) -> Callable:
"""
A decorator on a function to re-run when an exception occurs.
Credits: https://github.com/hpcaitech/ColossalAI/blob/adae123df3badfb15d044bd416f0cf29f250bc86/colossalai/testing/utils.py#L71
Usage::
# rerun for all kinds of exception
@rerun_on_exception()
def test_method():
print('hey')
raise RuntimeError('Address already in use')
# rerun for RuntimeError only
@rerun_on_exception(exception_type=RuntimeError)
def test_method():
print('hey')
raise RuntimeError('Address already in use')
# rerun for maximum 10 times if Runtime error occurs
@rerun_on_exception(exception_type=RuntimeError, max_try=10)
def test_method():
print('hey')
raise RuntimeError('Address already in use')
# rerun for infinite times if Runtime error occurs
@rerun_on_exception(exception_type=RuntimeError, max_try=None)
def test_method():
print('hey')
raise RuntimeError('Address already in use')
# rerun only the exception message is matched with pattern
# for infinite times if Runtime error occurs
@rerun_on_exception(exception_type=RuntimeError, pattern="^Address.*$")
def test_method():
print('hey')
raise RuntimeError('Address already in use')
Args:
exception_type (Exception, Optional): The type of exception to detect for rerun
pattern (str, Optional): The pattern to match the exception message.
If the pattern is not None and matches the exception message,
the exception will be detected for rerun
max_try (int, Optional): Maximum reruns for this function. The default value is 5.
If max_try is None, it will rerun forever if exception keeps occurring
"""
def _match_lines(lines, pattern):
for line in lines:
if re.match(pattern, line):
return True
return False
def _wrapper(func):
def _run_until_success(*args, **kwargs):
try_count = 0
assert max_try is None or isinstance(
max_try, int
), f"Expected max_try to be None or int, but got {type(max_try)}"
while max_try is None or try_count < max_try:
try:
try_count += 1
ret = func(*args, **kwargs)
return ret
except exception_type as e:
error_lines = str(e).split("\n")
if try_count < max_try and (pattern is None or _match_lines(error_lines, pattern)):
print("Exception is caught, retrying...")
# when pattern is not specified, we always skip the exception
# when pattern is specified, we only skip when pattern is matched
continue
else:
print("Maximum number of attempts is reached or pattern is not matched, no more retrying...")
raise e
# Override signature
# otherwise pytest.mark.parameterize will raise the following error:
# function does not use argument xxx
sig = signature(func)
_run_until_success.__signature__ = sig
return _run_until_success
return _wrapper
def global_wrapper(rank, func, tp, pp, dp, port, kwargs):
def setup_dist_env(rank, world_size, port):
os.environ["WORLD_SIZE"] = str(world_size)
os.environ["RANK"] = str(rank)
# NOTE: since we do unit tests in a
# single node => this is fine!
os.environ["LOCAL_RANK"] = str(rank)
os.environ["MASTER_ADDR"] = "localhost"
os.environ["MASTER_PORT"] = str(port)
world_size = tp * pp * dp
setup_dist_env(rank, world_size, port)
parallel_context = ParallelContext(data_parallel_size=dp, pipeline_parallel_size=pp, tensor_parallel_size=tp)
func(parallel_context, **kwargs)
def init_distributed(tp: int, dp: int, pp: int):
def _init_distributed(func):
def wrapper(**kwargs):
from nanotron.utils import find_free_port
world_size = tp * pp * dp
port = find_free_port()
# Note that kwargs needs to be passed as part of args in a way that can be unpacked
args = (func, tp, pp, dp, port, kwargs)
mp.spawn(global_wrapper, args=args, nprocs=world_size)
return wrapper
return _init_distributed
import torch
from nanotron.logging import LoggerWriter
from nanotron.nn.layer_norm import TritonLayerNorm
from torch.nn import LayerNorm
def get_time_name():
import datetime
today = datetime.datetime.now()
return today.strftime("%d/%m/%Y_%H:%M:%S")
if __name__ == "__main__":
BATCH_SIZE = 1
SEQ_LEN = 2
DEVICE, DTYPE = torch.device("cuda:0"), torch.float32
HIDDEN_SIZE = 1024
NUM_STEPS = 10_000
inputs = torch.randn(BATCH_SIZE, SEQ_LEN, HIDDEN_SIZE, device=DEVICE, dtype=DTYPE)
layer_norm = LayerNorm(normalized_shape=inputs.size(-1), device=DEVICE, dtype=DTYPE)
fused_layer_norm = TritonLayerNorm(
normalized_shape=inputs.size(-1),
device=DEVICE,
dtype=DTYPE,
)
ref_optim = torch.optim.Adam(layer_norm.parameters(), lr=0.1)
optim = torch.optim.Adam(fused_layer_norm.parameters(), lr=0.1)
logger = LoggerWriter()
def loss_function(x):
return x.sum()
for step in range(NUM_STEPS):
# NOTE: just make the output fluctuate a bit
random = torch.randn(1, device=DEVICE) * 0.01
ref_outputs = layer_norm(inputs) * random
outputs = fused_layer_norm(inputs) * random
loss = loss_function(outputs)
ref_loss = loss_function(ref_outputs)
ref_optim.zero_grad()
ref_loss.backward()
ref_optim.step()
optim.zero_grad()
loss.backward()
optim.step()
print(f"Step: {step}, outputs: {outputs.sum()}, ref_loss: {ref_outputs.sum()}")
print(f"Step: {step}, loss: {loss}, ref_loss: {ref_loss}")
# wandb.log({"loss": loss.item(), "ref_loss": ref_loss.item(), "step": step})
logger.add_scalar("loss", loss.item(), step)
logger.add_scalar("ref_loss", ref_loss.item(), step)
import pytest
import torch
from torch.nn import LayerNorm
from nanotron.nn.layer_norm import TritonLayerNorm
@pytest.mark.fa2
@pytest.mark.parametrize(
"hidden_size",
[1024, 1025], # fused layer norm supports 1024 as hidden size but not 1025
)
def test_fused_layer_norm(hidden_size):
BATCH_SIZE = 5
SEQ_LEN = 128
DEVICE, DTYPE = torch.device("cuda:0"), torch.float16
inputs = torch.rand(BATCH_SIZE, SEQ_LEN, hidden_size, device=DEVICE, dtype=DTYPE)
layer_norm = LayerNorm(normalized_shape=inputs.size(-1), device=DEVICE, dtype=DTYPE)
ref_outputs = layer_norm(inputs)
fused_layer_norm = TritonLayerNorm(
normalized_shape=inputs.size(-1),
device=DEVICE,
dtype=DTYPE,
)
outputs = fused_layer_norm(inputs)
# NOTE: with torch.float16, FA2's use a atol of 1e-2
# https://github.com/Dao-AILab/flash-attention/blob/87a1277653fc55cd615f5341255e00c69d5c00a1/tests/ops/triton/test_layer_norm.py#L63-L64
torch.testing.assert_close(outputs, ref_outputs, rtol=1e-3, atol=1e-2)
outputs.sum().backward()
ref_outputs.sum().backward()
# NOTE: same as above
torch.testing.assert_close(fused_layer_norm.weight.grad, layer_norm.weight.grad, rtol=1e-3, atol=1e-2)
torch.testing.assert_close(fused_layer_norm.bias.grad, layer_norm.bias.grad, rtol=1e-3, atol=1e-2)
import sys
from math import isclose
from pathlib import Path
from typing import List
package_path = Path(__file__).parent.parent
sys.path.append(str(package_path))
import numpy as np
import pytest
from helpers.context import TestContext
from helpers.data import (
assert_batch_dataloader,
assert_nanoset_sync_across_all_ranks,
compute_batch_hash,
create_dataset_paths,
create_dummy_json_dataset,
preprocess_dummy_dataset,
)
from helpers.utils import available_gpus, get_all_3d_configurations, init_distributed, rerun_if_address_is_in_use
from nanotron.data.dataloader_builder import build_nanoset_dataloader
from nanotron.data.nanoset import Nanoset
from nanotron.data.utils import count_dataset_indexes, normalize
from nanotron.parallel import ParallelContext
from nanotron.utils import main_rank_first
from transformers import AutoTokenizer
@pytest.mark.parametrize(
"tp,dp,pp",
[
pytest.param(*all_3d_configs)
for gpus in range(1, min(available_gpus(), 4) + 1)
for all_3d_configs in get_all_3d_configurations(gpus)
],
)
@pytest.mark.parametrize("train_steps", [500, 10000])
@pytest.mark.parametrize("sequence_length", [512, 8192])
@pytest.mark.parametrize("tokenizer_name_or_path", ["openai-community/gpt2", "unsloth/llama-3-8b-bnb-4bit"])
@rerun_if_address_is_in_use()
def test_build_nanoset_dataloader(
tp: int, dp: int, pp: int, train_steps: int, sequence_length: int, tokenizer_name_or_path: str
):
test_context = TestContext()
# Create dataset folders
json_paths, datatrove_tokenized_dataset_folders = create_dataset_paths(
tmp_dir=test_context.get_auto_remove_tmp_dir(), quantity=2
)
# Create dummy json datasets
for idx, json_path in enumerate(json_paths):
create_dummy_json_dataset(path_to_json=json_path, dummy_text=f"Nanoset {idx}!", n_samples=(idx + 1) * 50000)
# Preprocess json dataset with datatrove
for json_path, datatrove_tokenized_dataset_folder in zip(json_paths, datatrove_tokenized_dataset_folders):
preprocess_dummy_dataset(json_path, datatrove_tokenized_dataset_folder, tokenizer_name_or_path)
init_distributed(tp=tp, dp=dp, pp=pp)(_test_build_nanoset_dataloader)(
datatrove_tokenized_dataset_folders=datatrove_tokenized_dataset_folders,
train_steps=train_steps,
sequence_length=sequence_length,
tokenizer_name_or_path=tokenizer_name_or_path,
)
def _test_build_nanoset_dataloader(
parallel_context: ParallelContext,
datatrove_tokenized_dataset_folders: List[str],
train_steps: int,
sequence_length: int,
tokenizer_name_or_path: str,
):
SEED = 1234
MICRO_BATCH_SIZE = 4
N_MICRO_BATCHES_PER_BATCH = 8
GLOBAL_BATCH_SIZE = MICRO_BATCH_SIZE * N_MICRO_BATCHES_PER_BATCH * parallel_context.dp_pg.size()
input_pp_rank, output_pp_rank = 0, int(parallel_context.pp_pg.size() - 1)
# Get tokenizer cardinality
tokenizer = AutoTokenizer.from_pretrained(tokenizer_name_or_path)
token_size = 4 if len(tokenizer) > np.iinfo(np.uint16).max + 1 else 2
del tokenizer
# Create Nanoset configs: 1. Normal 2. Blended 3. Blended with weights
nanoset_config = {
"dataset_folders": [datatrove_tokenized_dataset_folders[0]],
"dataset_weights": [1],
"sequence_length": sequence_length,
"token_size": token_size,
"train_split_num_samples": train_steps * GLOBAL_BATCH_SIZE,
"random_seed": SEED,
}
blended_nanoset_config = {
"dataset_folders": datatrove_tokenized_dataset_folders,
"dataset_weights": None,
"sequence_length": sequence_length,
"token_size": token_size,
"train_split_num_samples": train_steps * GLOBAL_BATCH_SIZE,
"random_seed": SEED,
}
blended_weighted_nanoset_config = {
"dataset_folders": datatrove_tokenized_dataset_folders,
"dataset_weights": [8, 2],
"sequence_length": sequence_length,
"token_size": token_size,
"train_split_num_samples": train_steps * GLOBAL_BATCH_SIZE,
"random_seed": SEED,
}
configs = [nanoset_config, blended_nanoset_config, blended_weighted_nanoset_config]
for config in configs:
# Create Nanoset
with main_rank_first(parallel_context.world_pg):
train_dataset = Nanoset(**config)
# Assert we have the same Nanoset in all ranks
assert_nanoset_sync_across_all_ranks(train_dataset, parallel_context)
dataset_sample_count = count_dataset_indexes(train_dataset.dataset_index, len(train_dataset.dataset_folders))
for idx, ds_length in enumerate(train_dataset.dataset_lengths):
# Assert Nanoset doesn't sample indexes greater than the datasets
assert (
np.max(train_dataset.dataset_sample_index, where=train_dataset.dataset_index == idx, initial=-1)
< ds_length
), f"Error building Nanoset Indexes: Tryng to access sample {np.max(train_dataset.dataset_sample_index, where=train_dataset.dataset_index==idx, initial = -1)} of a {ds_length} sample dataset"
# Assert Nanoset builds up the correct blend WRT the dataset_weights
assert isclose(
normalize(dataset_sample_count).tolist()[idx], train_dataset.dataset_weights[idx], abs_tol=0.05
), f"Requested Nanoset to contain {round(train_dataset.dataset_weights[idx]*100, 2)}% of samples from {train_dataset.dataset_folders[idx]} but got {round(normalize(dataset_sample_count).tolist()[idx]*100, 2)}%"
# Create Dataloaders
dataloader = build_nanoset_dataloader(
train_dataset,
sequence_length=sequence_length,
parallel_context=parallel_context,
input_pp_rank=input_pp_rank,
output_pp_rank=output_pp_rank,
micro_batch_size=MICRO_BATCH_SIZE,
dataloader_num_workers=0,
dataloader_drop_last=True,
)
# Check a batch produced by the Dataloader
batch = next(iter(dataloader))
assert_batch_dataloader(
batch=batch,
parallel_context=parallel_context,
micro_batch_size=MICRO_BATCH_SIZE,
sequence_length=sequence_length,
)
parallel_context.destroy()
@pytest.mark.parametrize(
"tp,dp,pp",
[
pytest.param(*all_3d_configs)
for gpus in range(1, min(available_gpus(), 4) + 1)
for all_3d_configs in get_all_3d_configurations(gpus)
],
)
@pytest.mark.parametrize("skipped_batches", [20, 5555])
@pytest.mark.parametrize("tokenizer_name_or_path", ["openai-community/gpt2", "unsloth/llama-3-8b-bnb-4bit"])
@rerun_if_address_is_in_use()
def test_recover_nanoset_dataloader(tp: int, dp: int, pp: int, skipped_batches: int, tokenizer_name_or_path: str):
test_context = TestContext()
# Create dataset folders
json_paths, datatrove_tokenized_dataset_folders = create_dataset_paths(
tmp_dir=test_context.get_auto_remove_tmp_dir(), quantity=2
)
# Create dummy json datasets
for idx, json_path in enumerate(json_paths):
create_dummy_json_dataset(path_to_json=json_path, dummy_text=f"Nanoset {idx}!", n_samples=(idx + 1) * 50000)
# Preprocess json dataset with datatrove
for json_path, datatrove_tokenized_dataset_folder in zip(json_paths, datatrove_tokenized_dataset_folders):
preprocess_dummy_dataset(json_path, datatrove_tokenized_dataset_folder, tokenizer_name_or_path)
init_distributed(tp=tp, dp=dp, pp=pp)(_test_recover_nanoset_dataloader)(
datatrove_tokenized_dataset_folders=datatrove_tokenized_dataset_folders,
skipped_batches=skipped_batches,
tokenizer_name_or_path=tokenizer_name_or_path,
)
def _test_recover_nanoset_dataloader(
parallel_context: ParallelContext,
datatrove_tokenized_dataset_folders: List[str],
skipped_batches: int,
tokenizer_name_or_path: str,
):
SEED = 1234
MICRO_BATCH_SIZE = 4
N_MICRO_BATCHES_PER_BATCH = 8
GLOBAL_BATCH_SIZE = MICRO_BATCH_SIZE * N_MICRO_BATCHES_PER_BATCH * parallel_context.dp_pg.size()
SEQUENCE_LENGTH = 1024
TRAIN_STEPS = 10000
input_pp_rank, output_pp_rank = 0, int(parallel_context.pp_pg.size() - 1)
# Get tokenizer cardinality
tokenizer = AutoTokenizer.from_pretrained(tokenizer_name_or_path)
token_size = 4 if len(tokenizer) > np.iinfo(np.uint16).max + 1 else 2
del tokenizer
# Create Nanoset configs: 1. Normal 2. Blended 3. Blended with weights
nanoset_config = {
"dataset_folders": [datatrove_tokenized_dataset_folders[0]],
"dataset_weights": [1],
"sequence_length": SEQUENCE_LENGTH,
"token_size": token_size,
"train_split_num_samples": TRAIN_STEPS * GLOBAL_BATCH_SIZE,
"random_seed": SEED,
}
blended_nanoset_config = {
"dataset_folders": datatrove_tokenized_dataset_folders,
"dataset_weights": None,
"sequence_length": SEQUENCE_LENGTH,
"token_size": token_size,
"train_split_num_samples": TRAIN_STEPS * GLOBAL_BATCH_SIZE,
"random_seed": SEED,
}
blended_weighted_nanoset_config = {
"dataset_folders": datatrove_tokenized_dataset_folders,
"dataset_weights": [8, 2],
"sequence_length": SEQUENCE_LENGTH,
"token_size": token_size,
"train_split_num_samples": TRAIN_STEPS * GLOBAL_BATCH_SIZE,
"random_seed": SEED,
}
configs = [nanoset_config, blended_nanoset_config, blended_weighted_nanoset_config]
for config in configs:
# Create Nanoset
with main_rank_first(parallel_context.world_pg):
train_dataset = Nanoset(**config)
# Create initial Dataloader
dataloader = build_nanoset_dataloader(
train_dataset,
sequence_length=SEQUENCE_LENGTH,
parallel_context=parallel_context,
input_pp_rank=input_pp_rank,
output_pp_rank=output_pp_rank,
micro_batch_size=MICRO_BATCH_SIZE,
dataloader_num_workers=0,
dataloader_drop_last=True,
)
# Recover from failures
dataloader = iter(dataloader)
for _ in range(skipped_batches + 1): # In order to compare with the first batch of the recovered DataLoader
batch = next(dataloader)
# Create recover Dataloader
recovered_dataloader = build_nanoset_dataloader(
train_dataset,
sequence_length=SEQUENCE_LENGTH,
parallel_context=parallel_context,
input_pp_rank=input_pp_rank,
output_pp_rank=output_pp_rank,
micro_batch_size=MICRO_BATCH_SIZE,
dataloader_num_workers=0,
dataloader_drop_last=True,
# NOTE The dataloader serves batches of micro_batch_size despite of batch_accumulation_per_replica
consumed_train_samples=skipped_batches * MICRO_BATCH_SIZE * parallel_context.dp_pg.size(),
)
recovered_first_batch = next(iter(recovered_dataloader))
assert compute_batch_hash(batch) == compute_batch_hash(recovered_first_batch)
parallel_context.destroy()
[pytest]
addopts=-n 35
markers =
fa2: FA2-related
import pytest
import torch
import torch.distributed as dist
from helpers.llama import TINY_LLAMA_CONFIG, create_llama_from_config, get_llama_training_config
from helpers.utils import init_distributed, rerun_if_address_is_in_use
from nanotron.config import Config, ModelArgs, RandomInit
from nanotron.parallel import ParallelContext
from nanotron.parallel.pipeline_parallel.block import PipelineBlock
from torch import nn
@pytest.mark.parametrize("tp,dp,pp", [(1, 1, 1), (2, 2, 2)])
@pytest.mark.skip
@rerun_if_address_is_in_use()
def test_get_named_modules_in_pp_rank(tp: int, dp: int, pp: int):
model_args = ModelArgs(init_method=RandomInit(std=1.0), model_config=TINY_LLAMA_CONFIG)
config = get_llama_training_config(model_args)
init_distributed(tp=tp, dp=dp, pp=pp)(_test_get_named_modules_in_pp_rank)(config=config)
def _test_get_named_modules_in_pp_rank(
parallel_context: ParallelContext,
config: Config,
):
model = create_llama_from_config(
model_config=config.model.model_config,
device=torch.device("cuda"),
parallel_context=parallel_context,
)
model.init_model_randomly(config=config)
modules_that_not_in_current_pp_rank = {}
current_pp_rank = dist.get_rank(group=parallel_context.pp_pg)
for name, module in model.named_modules():
if isinstance(module, PipelineBlock) and module.rank != current_pp_rank:
modules_that_not_in_current_pp_rank[name] = module
named_modules_in_pp_rank = model.named_modules_in_pp_rank
for name, module in named_modules_in_pp_rank.items():
# NOTE: if a module is in the current rank, we expect it to be an initialized module
# not PipelineBlock
assert isinstance(module, nn.Module)
assert name not in modules_that_not_in_current_pp_rank
from typing import Union
import torch
from nanotron.parallel.pipeline_parallel.tensor_pointer import TensorPointer
from nanotron.utils import checkpoint_method
from torch import nn
class CheckpointedModel(nn.Module):
def __init__(self, is_checkpointed: bool = False):
super().__init__()
self.dense1 = nn.Linear(10, 10)
self.dense2 = nn.Linear(10, 10)
self.dropout = nn.Dropout(0.1)
self.is_checkpointed = is_checkpointed
self.fwd_counter = 0
@checkpoint_method("is_checkpointed")
def forward(self, x: Union[torch.Tensor, TensorPointer]):
x = self.dense1(x)
if self.is_checkpointed and self.fwd_counter == 0:
assert not x.requires_grad, "x should not require grad when checkpointed, because fwd runs in no_grad mode"
assert (
x.grad_fn is None
), "x should not store any activation when checkpointed, because fwd runs in no_grad mode"
x = self.dense2(x)
x = self.dropout(x)
self.fwd_counter += 1
return x
class DummyModel(nn.Module):
def __init__(self, is_checkpointed: bool = False):
super().__init__()
self.dense0 = nn.Linear(10, 10)
self.checkpointed_model = CheckpointedModel(is_checkpointed=is_checkpointed)
self.dense3 = nn.Linear(10, 10)
def forward(self, x: Union[torch.Tensor, TensorPointer]):
x = self.dense0(x)
x = self.checkpointed_model(x)
assert x.requires_grad # inside forward, x should require grad even if calculated in no_grad mode
x = self.dense3(x)
return x
def test_activation_checkpointing():
dtype = torch.float16
device = torch.device("cuda")
test_model = DummyModel(is_checkpointed=True)
ref_model = DummyModel(is_checkpointed=False)
for model in [test_model, ref_model]:
model.to(device=device, dtype=dtype)
# copy weights
test_model.load_state_dict(ref_model.state_dict())
assert test_model.checkpointed_model.is_checkpointed is True
assert ref_model.checkpointed_model.is_checkpointed is False
# generate random input
x = torch.randn(10, 10, device=device, dtype=dtype)
# Forward pass
with torch.random.fork_rng(devices=["cuda"]):
ref_output = ref_model(x)
checkpointed_output = test_model(x)
assert test_model.checkpointed_model.fwd_counter == 1
torch.testing.assert_close(checkpointed_output, ref_output)
# Backward pass (check that fwd is called twice, and that we don't store the activations)
ref_output.sum().backward()
assert ref_model.checkpointed_model.fwd_counter == 1, "ref_model fwd should not be called twice"
# make sure grads are not synced between test_model and ref_model
assert ref_model.dense0.weight.grad is not None
assert test_model.dense0.weight.grad is None
assert test_model.checkpointed_model.fwd_counter == 1
checkpointed_output.sum().backward()
assert test_model.checkpointed_model.fwd_counter == 2, "test_model fwd should be called twice"
# compare all models grads
for ref_param, checkpointed_param in zip(ref_model.parameters(), test_model.parameters()):
torch.testing.assert_close(ref_param.grad, checkpointed_param.grad)
# TODO @nouamanetazi: test `checkpoint_method` vs `torch.utils.checkpoint.checkpoint`
# TODO @nouamanetazi: test a method with kwargs values
# TODO @nouamanetazi: test `checkpoint_method` in a distributed setting
# TODO @nouamanetazi: test BatchNorm layers with checkpointing
import math
import os
import pytest
import torch
from helpers.dummy import DummyModel, dummy_infinite_data_loader
from helpers.utils import available_gpus, init_distributed, rerun_if_address_is_in_use
from nanotron import distributed as dist
from nanotron.models import init_on_device_and_dtype
from nanotron.optim.clip_grads import clip_grad_norm
from nanotron.optim.gradient_accumulator import (
FP32GradientAccumulator,
)
from nanotron.parallel import ParallelContext
from nanotron.parallel.parameters import NanotronParameter, sanity_check
from nanotron.parallel.pipeline_parallel.engine import (
AllForwardAllBackwardPipelineEngine,
)
from nanotron.parallel.pipeline_parallel.p2p import P2P
from nanotron.parallel.tensor_parallel.enum import TensorParallelLinearMode
from nanotron.parallel.tensor_parallel.nn import (
TensorParallelColumnLinear,
)
from nanotron.parallel.tied_parameters import (
sync_tied_weights_gradients,
tie_parameters,
)
from nanotron.parallel.utils import initial_sync
from nanotron.sanity_checks import assert_tensor_synced_across_pg
from torch import nn
@pytest.mark.skipif(available_gpus() < 2, reason="test_clip_grads_with_pp requires at least 2 gpus")
@pytest.mark.parametrize("norm_type", [math.inf, 1.0, 2.0])
@rerun_if_address_is_in_use()
def test_clip_grads_with_pp(norm_type: float):
init_distributed(tp=1, dp=1, pp=2)(_test_clip_grads_with_pp)(norm_type=norm_type)
def _test_clip_grads_with_pp(parallel_context: ParallelContext, norm_type: float):
device = torch.device("cuda")
p2p = P2P(parallel_context.pp_pg, device=device)
reference_rank = 0
has_reference_model = dist.get_rank(parallel_context.pp_pg) == reference_rank
pipeline_engine = AllForwardAllBackwardPipelineEngine()
current_pp_rank = dist.get_rank(parallel_context.pp_pg)
# spawn model
model = DummyModel(p2p=p2p)
if has_reference_model:
reference_model = DummyModel(p2p=p2p)
# Set the ranks
assert len(model.mlp) == parallel_context.pp_pg.size()
with init_on_device_and_dtype(device):
for pp_rank, non_linear in zip(range(parallel_context.pp_pg.size()), model.mlp):
non_linear.linear.build_and_set_rank(pp_rank=pp_rank)
non_linear.activation.build_and_set_rank(pp_rank=pp_rank)
model.loss.build_and_set_rank(pp_rank=parallel_context.pp_pg.size() - 1)
# build reference model
if has_reference_model:
for non_linear in reference_model.mlp:
non_linear.linear.build_and_set_rank(pp_rank=reference_rank)
non_linear.activation.build_and_set_rank(pp_rank=reference_rank)
reference_model.loss.build_and_set_rank(pp_rank=reference_rank)
for module in model.modules():
if isinstance(module, nn.Linear):
setattr(module, "weight", NanotronParameter(module.weight))
setattr(module, "bias", NanotronParameter(module.bias))
# synchronize weights
if has_reference_model:
with torch.inference_mode():
for pp_rank in range(parallel_context.pp_pg.size()):
reference_non_linear = reference_model.mlp[pp_rank].linear.pp_block
if pp_rank == current_pp_rank:
# We already have the weights locally
non_linear = model.mlp[pp_rank].linear.pp_block
reference_non_linear.weight.data.copy_(non_linear.weight.data)
reference_non_linear.bias.data.copy_(non_linear.bias.data)
continue
weight, bias = p2p.recv_tensors(num_tensors=2, from_rank=pp_rank)
reference_non_linear.weight.data.copy_(weight.data)
reference_non_linear.bias.data.copy_(bias.data)
else:
p2p.send_tensors(
[model.mlp[current_pp_rank].linear.pp_block.weight, model.mlp[current_pp_rank].linear.pp_block.bias],
to_rank=reference_rank,
)
# Get infinite dummy data iterator
data_iterator = dummy_infinite_data_loader(pp_pg=parallel_context.pp_pg) # First rank receives data
n_micro_batches_per_batch = 5
batch = [next(data_iterator) for _ in range(n_micro_batches_per_batch)]
pipeline_engine.train_batch_iter(
model, pg=parallel_context.pp_pg, batch=batch, nb_microbatches=n_micro_batches_per_batch, grad_accumulator=None
)
# Equivalent on the reference model
if has_reference_model:
for micro_batch in batch:
loss = reference_model(**micro_batch)
loss /= n_micro_batches_per_batch
loss.backward()
# Check that gradient are the same as reference
pp_rank = dist.get_rank(parallel_context.pp_pg)
if has_reference_model:
for pp_rank in range(parallel_context.pp_pg.size()):
reference_non_linear = reference_model.mlp[pp_rank].linear.pp_block
if pp_rank == current_pp_rank:
# We already have the gradients locally
non_linear = model.mlp[pp_rank].linear.pp_block
torch.testing.assert_close(
non_linear.weight.grad,
reference_non_linear.weight.grad,
atol=1e-6,
rtol=1e-7,
)
torch.testing.assert_close(non_linear.bias.grad, reference_non_linear.bias.grad, atol=1e-6, rtol=1e-7)
continue
weight_grad, bias_grad = p2p.recv_tensors(num_tensors=2, from_rank=pp_rank)
torch.testing.assert_close(weight_grad, reference_non_linear.weight.grad, atol=1e-6, rtol=1e-7)
torch.testing.assert_close(bias_grad, reference_non_linear.bias.grad, atol=1e-6, rtol=1e-7)
else:
p2p.send_tensors(
[model.mlp[pp_rank].linear.pp_block.weight.grad, model.mlp[pp_rank].linear.pp_block.bias.grad],
to_rank=reference_rank,
)
non_linear = model.mlp[current_pp_rank].linear.pp_block
old_weight_grad = non_linear.weight.grad.clone()
old_bias_grad = non_linear.bias.grad.clone()
# Clip grads
total_norm = clip_grad_norm(
mp_pg=parallel_context.mp_pg,
named_parameters=model.named_parameters(),
grad_accumulator=None,
max_norm=1.0,
norm_type=norm_type,
)
if has_reference_model:
reference_total_norm = torch.nn.utils.clip_grad_norm_(
reference_model.parameters(), max_norm=1.0, norm_type=norm_type
)
torch.testing.assert_close(total_norm, reference_total_norm, atol=1e-6, rtol=1e-7)
# Check that grad changed
assert not torch.allclose(old_weight_grad, non_linear.weight.grad), "Grad should have changed"
assert not torch.allclose(old_bias_grad, non_linear.weight.grad), "Grad should have changed"
# Check that gradient are the same as reference
if has_reference_model:
for pp_rank in range(parallel_context.pp_pg.size()):
reference_non_linear = reference_model.mlp[pp_rank].linear.pp_block
if pp_rank == current_pp_rank:
# We already have the gradients locally
non_linear = model.mlp[pp_rank].linear.pp_block
torch.testing.assert_close(
non_linear.weight.grad,
reference_non_linear.weight.grad,
atol=1e-6,
rtol=1e-7,
)
torch.testing.assert_close(
non_linear.bias.grad,
reference_non_linear.bias.grad,
atol=1e-6,
rtol=1e-7,
)
continue
weight_grad, bias_grad = p2p.recv_tensors(num_tensors=2, from_rank=pp_rank)
torch.testing.assert_close(weight_grad, reference_non_linear.weight.grad, atol=1e-6, rtol=1e-7)
torch.testing.assert_close(bias_grad, reference_non_linear.bias.grad, atol=1e-6, rtol=1e-7)
else:
p2p.send_tensors(
[
model.mlp[current_pp_rank].linear.pp_block.weight.grad,
model.mlp[current_pp_rank].linear.pp_block.bias.grad,
],
to_rank=reference_rank,
)
print(parallel_context.__dir__())
parallel_context.destroy()
@pytest.mark.skipif(available_gpus() < 2, reason="test_clip_grads_with_tp requires at least 2 gpus")
@pytest.mark.parametrize(
"tp_mode,async_communication",
[
pytest.param(TensorParallelLinearMode.ALL_REDUCE, False),
pytest.param(TensorParallelLinearMode.REDUCE_SCATTER, True),
],
)
@pytest.mark.parametrize("norm_type", [math.inf, 1.0, 2.0])
@rerun_if_address_is_in_use()
def test_clip_grads_with_tp(tp_mode: TensorParallelLinearMode, async_communication: bool, norm_type: float):
init_distributed(tp=2, dp=1, pp=1)(_test_clip_grads_with_tp)(
tp_mode=tp_mode, async_communication=async_communication, norm_type=norm_type
)
def _test_clip_grads_with_tp(
parallel_context: ParallelContext, tp_mode: TensorParallelLinearMode, async_communication: bool, norm_type: float
):
if async_communication:
os.environ["CUDA_DEVICE_MAX_CONNECTIONS"] = "1"
in_features = 4
out_features_per_tp_rank = 8
out_features = parallel_context.tp_pg.size() * out_features_per_tp_rank
# Sharded
column_linear = TensorParallelColumnLinear(
in_features=in_features,
out_features=out_features,
pg=parallel_context.tp_pg,
mode=tp_mode,
device="cuda",
async_communication=async_communication,
)
# Un-sharded
reference_linear = nn.Linear(in_features=in_features, out_features=out_features, device="cuda")
# Copy weights/bias from sharded to un-sharded
with torch.inference_mode():
dist.all_gather(
tensor_list=list(reference_linear.weight.split(out_features_per_tp_rank, dim=0)),
tensor=column_linear.weight,
group=parallel_context.tp_pg,
)
dist.all_gather(
tensor_list=list(reference_linear.bias.split(out_features_per_tp_rank, dim=0)),
tensor=column_linear.bias,
group=parallel_context.tp_pg,
)
# Generate random input
random_input: torch.Tensor
sharded_random_input: torch.Tensor
if tp_mode is TensorParallelLinearMode.ALL_REDUCE:
batch_size = 5
random_input = torch.randn(batch_size, in_features, device="cuda")
# synchronize random_input across tp
dist.all_reduce(random_input, op=dist.ReduceOp.AVG, group=parallel_context.tp_pg)
sharded_random_input = random_input
elif tp_mode is TensorParallelLinearMode.REDUCE_SCATTER:
sharded_batch_size = 5
sharded_random_input = torch.randn(sharded_batch_size, in_features, device="cuda")
random_input = torch.empty(
sharded_batch_size * parallel_context.tp_pg.size(),
*(sharded_random_input.shape[1:]),
device=sharded_random_input.device,
dtype=sharded_random_input.dtype,
)
dist.all_gather_into_tensor(random_input, sharded_random_input, group=parallel_context.tp_pg)
else:
ValueError(f"Unsupported mode: {tp_mode}")
# Test that we get the same output after forward pass
sharded_output = column_linear(sharded_random_input)
reference_output = reference_linear(random_input)
# TODO @thomasw21: Tune tolerance
torch.testing.assert_close(
sharded_output,
reference_output[
:,
dist.get_rank(parallel_context.tp_pg)
* out_features_per_tp_rank : (dist.get_rank(parallel_context.tp_pg) + 1)
* out_features_per_tp_rank,
],
atol=1e-6,
rtol=1e-7,
)
# Test that we get the same gradient after backward pass
sharded_output.sum().backward()
reference_output.sum().backward()
torch.testing.assert_close(
column_linear.weight.grad,
reference_linear.weight.grad[
dist.get_rank(parallel_context.tp_pg)
* out_features_per_tp_rank : (dist.get_rank(parallel_context.tp_pg) + 1)
* out_features_per_tp_rank
],
atol=1e-6,
rtol=1e-7,
)
torch.testing.assert_close(
column_linear.bias.grad,
reference_linear.bias.grad[
dist.get_rank(parallel_context.tp_pg)
* out_features_per_tp_rank : (dist.get_rank(parallel_context.tp_pg) + 1)
* out_features_per_tp_rank
],
atol=1e-6,
rtol=1e-7,
)
old_grad = column_linear.weight.grad.clone()
# Clip grads
total_norm = clip_grad_norm(
mp_pg=parallel_context.mp_pg,
named_parameters=column_linear.named_parameters(),
grad_accumulator=None,
max_norm=1.0,
norm_type=norm_type,
)
ref_total_norm = torch.nn.utils.clip_grad_norm_(reference_linear.parameters(), max_norm=1.0, norm_type=norm_type)
# Check that the gradients have changed
assert not torch.allclose(old_grad, column_linear.weight.grad), "Gradients should have changed after clipping"
# Test that we get the same gradient after clipping
torch.testing.assert_close(
column_linear.weight.grad,
reference_linear.weight.grad[
dist.get_rank(parallel_context.tp_pg)
* out_features_per_tp_rank : (dist.get_rank(parallel_context.tp_pg) + 1)
* out_features_per_tp_rank
],
)
torch.testing.assert_close(
column_linear.bias.grad,
reference_linear.bias.grad[
dist.get_rank(parallel_context.tp_pg)
* out_features_per_tp_rank : (dist.get_rank(parallel_context.tp_pg) + 1)
* out_features_per_tp_rank
],
)
torch.testing.assert_close(total_norm, ref_total_norm)
parallel_context.destroy()
@pytest.mark.skipif(available_gpus() < 2, reason="test_clip_grads_tied_weights requires at least 2 gpus")
@pytest.mark.parametrize("norm_type", [math.inf, 1.0, 2.0])
@rerun_if_address_is_in_use()
def test_clip_grads_tied_weights(norm_type: float):
init_distributed(tp=1, dp=1, pp=2)(_test_clip_grads_tied_weights)(norm_type=norm_type)
def _test_clip_grads_tied_weights(parallel_context: ParallelContext, norm_type: float):
if dist.get_rank(parallel_context.pp_pg) == 0:
model = nn.ModuleDict({"dense0": nn.Linear(10, 10, device="cuda")})
else:
model = nn.ModuleDict({"dense1": nn.Linear(10, 10, device="cuda")})
# Tie weights/bias
tie_parameters(
root_module=model,
ties=[("dense0.weight", (0,)), ("dense1.weight", (1,))],
parallel_context=parallel_context,
reduce_op=dist.ReduceOp.SUM,
)
tie_parameters(
root_module=model,
ties=[("dense0.bias", (0,)), ("dense1.bias", (1,))],
parallel_context=parallel_context,
reduce_op=dist.ReduceOp.SUM,
)
group = parallel_context.world_ranks_to_pg[(0, 1)]
# Check that model weights are not in fact synchronized
if dist.get_rank(parallel_context.pp_pg) == 0:
weight = model.dense0.weight
bias = model.dense0.bias
else:
weight = model.dense1.weight
bias = model.dense1.bias
# Make sure that weight/bias are NanotronParameter and that they are tied
assert isinstance(weight, NanotronParameter)
assert weight.is_tied
assert isinstance(bias, NanotronParameter)
assert bias.is_tied
# Sync tied weights: basic assumption
initial_sync(model=model, parallel_context=parallel_context)
# Check that weights are now synced
assert_tensor_synced_across_pg(weight, group)
assert_tensor_synced_across_pg(bias, group)
# Compute gradient
input_ = torch.randn(13, 10, device="cuda")
if dist.get_rank(parallel_context.pp_pg) == 0:
out = model.dense0(input_)
else:
out = model.dense1(input_)
out.sum().backward()
# sync gradients
sync_tied_weights_gradients(model, parallel_context=parallel_context, grad_accumulator=None)
# We check that we both gradients are synchronized
assert_tensor_synced_across_pg(weight.grad, group)
assert_tensor_synced_across_pg(bias.grad, group)
# Save grads as reference
ref_weight = weight.clone()
ref_weight.grad = weight.grad.clone()
ref_bias = bias.clone()
ref_bias.grad = bias.grad.clone()
old_grad = weight.grad.clone()
# Clip grads
total_norm = clip_grad_norm(
mp_pg=parallel_context.mp_pg,
named_parameters=model.named_parameters(),
grad_accumulator=None,
max_norm=1.0,
norm_type=norm_type,
)
ref_total_norm = torch.nn.utils.clip_grad_norm_([ref_weight, ref_bias], max_norm=1.0, norm_type=norm_type)
# Check that the gradients have changed
assert not torch.allclose(old_grad, weight.grad), "Gradients should have changed after clipping"
# Test that we get the same gradient after clipping
assert torch.allclose(weight.grad, ref_weight.grad, rtol=1e-7, atol=1e-6)
assert torch.allclose(bias.grad, ref_bias.grad, rtol=1e-7, atol=1e-6)
assert torch.allclose(total_norm, ref_total_norm, rtol=0, atol=0), f"Got {total_norm} and {ref_total_norm}"
parallel_context.destroy()
@pytest.mark.parametrize("half_precision", [torch.float16, torch.bfloat16])
@pytest.mark.parametrize("norm_type", [math.inf, 1.0, 2.0])
@rerun_if_address_is_in_use()
def test_clip_grads_fp32_accumulator(norm_type: float, half_precision: torch.dtype):
init_distributed(tp=1, dp=1, pp=2)(_test_clip_grads_fp32_accumulator)(
norm_type=norm_type, half_precision=half_precision
)
def _test_clip_grads_fp32_accumulator(
parallel_context: ParallelContext, norm_type: float, half_precision: torch.dtype
):
device = torch.device("cuda")
p2p = P2P(parallel_context.pp_pg, device=device)
reference_rank = 0
has_reference_model = dist.get_rank(parallel_context.pp_pg) == reference_rank
pipeline_engine = AllForwardAllBackwardPipelineEngine()
current_pp_rank = dist.get_rank(parallel_context.pp_pg)
# spawn model
model = DummyModel(p2p=p2p)
if has_reference_model:
reference_model = DummyModel(p2p=p2p).to(torch.float)
# Set the ranks
assert len(model.mlp) == parallel_context.pp_pg.size()
with init_on_device_and_dtype(device):
for pp_rank, non_linear in zip(range(parallel_context.pp_pg.size()), model.mlp):
non_linear.linear.build_and_set_rank(pp_rank=pp_rank)
non_linear.activation.build_and_set_rank(pp_rank=pp_rank)
model.loss.build_and_set_rank(pp_rank=parallel_context.pp_pg.size() - 1)
if has_reference_model:
for non_linear in reference_model.mlp:
non_linear.linear.build_and_set_rank(pp_rank=reference_rank)
non_linear.activation.build_and_set_rank(pp_rank=reference_rank)
reference_model.loss.build_and_set_rank(pp_rank=reference_rank)
for module in model.modules():
if isinstance(module, nn.Linear):
setattr(module, "weight", NanotronParameter(module.weight))
setattr(module, "bias", NanotronParameter(module.bias))
# model goes to half precision
model = model.to(half_precision)
# synchronize weights
if has_reference_model:
with torch.inference_mode():
for pp_rank in range(parallel_context.pp_pg.size()):
reference_non_linear = reference_model.mlp[pp_rank].linear.pp_block
if pp_rank == current_pp_rank:
# We already have the weights locally
non_linear = model.mlp[pp_rank].linear.pp_block
reference_non_linear.weight.data.copy_(non_linear.weight.data)
reference_non_linear.bias.data.copy_(non_linear.bias.data)
continue
weight, bias = p2p.recv_tensors(num_tensors=2, from_rank=pp_rank)
reference_non_linear.weight.data.copy_(weight.data)
reference_non_linear.bias.data.copy_(bias.data)
else:
p2p.send_tensors(
[model.mlp[current_pp_rank].linear.pp_block.weight, model.mlp[current_pp_rank].linear.pp_block.bias],
to_rank=reference_rank,
)
# Add gradient accumulator
grad_accumulator = FP32GradientAccumulator(model.named_parameters())
# Check that our model is a valid model
sanity_check(model)
# Compute backward
# Get infinite dummy data iterator
data_iterator = dummy_infinite_data_loader(
pp_pg=parallel_context.pp_pg, dtype=half_precision
) # First rank receives data
n_micro_batches_per_batch = 5
batch = [next(data_iterator) for _ in range(n_micro_batches_per_batch)]
pipeline_engine.train_batch_iter(
model,
pg=parallel_context.pp_pg,
batch=batch,
nb_microbatches=n_micro_batches_per_batch,
grad_accumulator=grad_accumulator,
)
# We're going to copy the model gradients to the reference model gradient
# The reason why we do this, instead of computing backward using autograd is because of numerical precisions
if has_reference_model:
for pp_rank in range(parallel_context.pp_pg.size()):
reference_non_linear = reference_model.mlp[pp_rank].linear.pp_block
prefix_name = f"mlp.{pp_rank}.linear.pp_block"
if pp_rank == current_pp_rank:
# We already have the gradients locally
reference_non_linear.weight.grad = grad_accumulator.get_grad_buffer(f"{prefix_name}.weight").clone()
reference_non_linear.bias.grad = grad_accumulator.get_grad_buffer(f"{prefix_name}.bias").clone()
continue
weight_grad, bias_grad = p2p.recv_tensors(num_tensors=2, from_rank=pp_rank)
reference_non_linear.weight.grad = weight_grad
reference_non_linear.bias.grad = bias_grad
else:
p2p.send_tensors(
[
grad_accumulator.get_grad_buffer(f"mlp.{current_pp_rank}.linear.pp_block.weight"),
grad_accumulator.get_grad_buffer(f"mlp.{current_pp_rank}.linear.pp_block.bias"),
],
to_rank=reference_rank,
)
old_fp32_grads = {
name: grad_accumulator.get_grad_buffer(name=name).clone() for name, _ in model.named_parameters()
}
# Clip grads
total_norm = clip_grad_norm(
mp_pg=parallel_context.mp_pg,
named_parameters=model.named_parameters(),
grad_accumulator=grad_accumulator,
max_norm=1.0,
norm_type=norm_type,
)
if has_reference_model:
ref_total_norm = torch.nn.utils.clip_grad_norm_(
reference_model.parameters(), max_norm=1.0, norm_type=norm_type
)
# Check that the gradients have changed
for name, _ in model.named_parameters():
new_fp32_grad = grad_accumulator.get_grad_buffer(name=name)
assert not torch.allclose(old_fp32_grads[name], new_fp32_grad), "Gradients should have changed after clipping"
# We check that we get the same gradient accumulation. In theory we do get more precision by promoting gradients to fp32.
if has_reference_model:
torch.testing.assert_close(
total_norm.view(1),
ref_total_norm.view(1),
atol=1e-6,
rtol=1e-7,
msg=lambda msg: f"Expected {total_norm} to match {ref_total_norm}.\n{msg}",
)
for pp_rank in range(parallel_context.pp_pg.size()):
reference_non_linear = reference_model.mlp[pp_rank].linear.pp_block
prefix_name = f"mlp.{pp_rank}.linear.pp_block"
if pp_rank == current_pp_rank:
# We already have the gradients locally
torch.testing.assert_close(
reference_non_linear.weight.grad,
grad_accumulator.get_grad_buffer(f"{prefix_name}.weight"),
atol=1e-6,
rtol=1e-7,
)
torch.testing.assert_close(
reference_non_linear.bias.grad,
grad_accumulator.get_grad_buffer(f"{prefix_name}.bias"),
atol=1e-6,
rtol=1e-7,
)
continue
weight_grad, bias_grad = p2p.recv_tensors(num_tensors=2, from_rank=pp_rank)
torch.testing.assert_close(
reference_non_linear.weight.grad,
weight_grad,
atol=1e-6,
rtol=1e-7,
)
torch.testing.assert_close(
reference_non_linear.bias.grad,
bias_grad,
atol=1e-6,
rtol=1e-7,
)
else:
p2p.send_tensors(
[
grad_accumulator.get_grad_buffer(f"mlp.{current_pp_rank}.linear.pp_block.weight"),
grad_accumulator.get_grad_buffer(f"mlp.{current_pp_rank}.linear.pp_block.bias"),
],
to_rank=reference_rank,
)
parallel_context.destroy()
from contextlib import nullcontext
import pytest
import torch
from helpers.exception import assert_fail_except_rank_with
from helpers.utils import available_gpus, init_distributed, rerun_if_address_is_in_use
from nanotron import distributed as dist
from nanotron.parallel import ParallelContext
from nanotron.parallel.data_parallel.utils import ddp_trigger_sync_in_bwd
from nanotron.parallel.parameters import NanotronParameter
from nanotron.sanity_checks import assert_tensor_synced_across_pg
from torch import nn
from torch.distributed import GradBucket
@pytest.mark.skipif(available_gpus() < 2, reason="Testing test_ddp_with_afab requires at least 2 gpus")
@pytest.mark.parametrize("accumulation_steps", [1, 3])
@rerun_if_address_is_in_use()
def test_ddp_with_afab(accumulation_steps):
init_distributed(tp=1, dp=2, pp=1)(_test_ddp_with_afab)(accumulation_steps=accumulation_steps)
def _test_ddp_with_afab(parallel_context: ParallelContext, accumulation_steps: int):
half_precision = torch.float16
def allreduce_hook(process_group: dist.ProcessGroup, bucket: GradBucket):
# DDP groups grads in GradBuckets. This hook is called throughout the bwd pass, once each bucket is ready to overlap communication with computation.
# See https://pytorch.org/docs/stable/ddp_comm_hooks.html#what-does-a-communication-hook-operate-on for more details.
half_flat_bucket_buffer = bucket.buffer()
group_to_use = process_group if process_group is not None else parallel_context.dp_pg
return (
dist.all_reduce(half_flat_bucket_buffer, group=group_to_use, async_op=True, op=dist.ReduceOp.AVG)
.get_future()
.then(lambda fut: fut.value()[0])
)
model_hook = nn.Linear(3, 2, bias=False, dtype=half_precision, device="cuda")
# Create Nanotron Parameter
model_hook.weight = NanotronParameter(model_hook.weight)
model_ddp_hook = torch.nn.parallel.DistributedDataParallel(
model_hook,
process_group=parallel_context.dp_pg,
)
# Register DDP hook
model_ddp_hook.register_comm_hook(state=None, hook=allreduce_hook)
activations = []
# All forward
for i in range(accumulation_steps):
input = torch.randn(5, 3, dtype=half_precision, device="cuda")
with model_ddp_hook.no_sync():
loss_hook = model_ddp_hook(input).sum()
activations.append(loss_hook)
# All backward
for i in range(accumulation_steps):
context = nullcontext()
if i == accumulation_steps - 1:
context = ddp_trigger_sync_in_bwd(model_ddp_hook) # triggers a sync for the final backward
loss_hook = activations[i]
with context:
loss_hook.backward()
grad_hook = model_ddp_hook.module.weight.grad.clone()
# Check that the gradients are synchronized across DP
if i == accumulation_steps - 1:
assert_tensor_synced_across_pg(grad_hook, parallel_context.dp_pg)
else:
with assert_fail_except_rank_with(AssertionError, rank_exception=0, pg=parallel_context.dp_pg):
assert_tensor_synced_across_pg(grad_hook, parallel_context.dp_pg)
parallel_context.destroy()
import numpy as np
import pytest
import torch.distributed as dist
from helpers.utils import (
available_gpus,
get_all_3d_configurations,
init_distributed,
rerun_if_address_is_in_use,
)
from nanotron.parallel import ParallelContext
from torch.distributed import ProcessGroup
def _test_init_parallel_context(parallel_context: ParallelContext):
assert dist.is_initialized() is True
assert isinstance(parallel_context.world_pg, ProcessGroup)
assert isinstance(parallel_context.tp_pg, ProcessGroup) if parallel_context.tensor_parallel_size > 1 else True
assert isinstance(parallel_context.pp_pg, ProcessGroup) if parallel_context.pipeline_parallel_size > 1 else True
assert isinstance(parallel_context.dp_pg, ProcessGroup) if parallel_context.data_parallel_size > 1 else True
world_rank = dist.get_rank(parallel_context.world_pg)
ranks3d = parallel_context.get_local_ranks(world_rank)
assert isinstance(ranks3d, tuple) and len(ranks3d)
assert isinstance(parallel_context.world_rank_matrix, np.ndarray)
assert isinstance(parallel_context.world_ranks_to_pg, dict)
local_rank = tuple(i.item() for i in np.where(parallel_context.world_rank_matrix == world_rank))
global_rank = parallel_context.get_global_rank(*local_rank)
assert isinstance(global_rank, np.int64), f"The type of global_rank is {type(global_rank)}"
assert global_rank == dist.get_rank()
parallel_context.destroy()
assert dist.is_initialized() is False
@pytest.mark.parametrize(
"tp,dp,pp",
[
pytest.param(*all_3d_configs)
for gpus in range(1, min(available_gpus(), 4) + 1)
for all_3d_configs in get_all_3d_configurations(gpus)
],
)
@rerun_if_address_is_in_use()
def test_init_parallel_context(tp: int, dp: int, pp: int):
init_distributed(tp=tp, dp=dp, pp=pp)(_test_init_parallel_context)()
\ No newline at end of file
from typing import Union
import pytest
import torch
from helpers.llama import TINY_LLAMA_CONFIG, create_llama_from_config, get_llama_training_config
from helpers.utils import init_distributed, rerun_if_address_is_in_use
from nanotron.config import ModelArgs, RandomInit, SpectralMupInit
from nanotron.helpers import get_custom_lr_for_named_parameters
from nanotron.parallel import ParallelContext
from nanotron.scaling.parametrization import ParametrizationMethod
@pytest.mark.parametrize("tp,dp,pp", [(1, 1, 1), (2, 1, 1), (1, 1, 2), (2, 1, 2)])
@pytest.mark.parametrize(
"parametrization_method", [ParametrizationMethod.STANDARD, ParametrizationMethod.SPECTRAL_MUP]
)
@pytest.mark.skip
@rerun_if_address_is_in_use()
def test_get_custom_lr(tp: int, dp: int, pp: int, parametrization_method: ParametrizationMethod):
LR = 1e-3
if parametrization_method == ParametrizationMethod.STANDARD:
init_method = RandomInit(std=1.0)
elif parametrization_method == ParametrizationMethod.SPECTRAL_MUP:
init_method = SpectralMupInit(use_mup=True)
init_distributed(tp=tp, dp=dp, pp=pp)(_test_get_custom_lr)(
lr=LR,
init_method=init_method,
parametrization_method=parametrization_method,
)
def _test_get_custom_lr(
parallel_context: ParallelContext,
lr: float,
init_method: Union[RandomInit, SpectralMupInit],
parametrization_method: ParametrizationMethod,
):
model_args = ModelArgs(init_method=init_method, model_config=TINY_LLAMA_CONFIG)
config = get_llama_training_config(model_args)
llama = create_llama_from_config(
model_config=TINY_LLAMA_CONFIG,
device=torch.device("cuda"),
parallel_context=parallel_context,
)
llama.init_model_randomly(config=config, init_method=parametrization_method)
named_parameters = list(llama.get_named_params_with_correct_tied())
if len(named_parameters) == 0:
# NOTE: some pp ranks don't have any parameters
return
named_param_groups = get_custom_lr_for_named_parameters(
parametrization_method=parametrization_method, lr=lr, named_parameters=named_parameters, model=llama
)
assert len(named_param_groups) == len(named_parameters)
assert all(isinstance(named_param_group["lr"], float) for named_param_group in named_param_groups)
assert all(isinstance(named_param_group["named_params"], list) for named_param_group in named_param_groups)
is_all_lr_the_same = parametrization_method == ParametrizationMethod.STANDARD
assert all(named_param_group["lr"] == lr for named_param_group in named_param_groups) is is_all_lr_the_same
import pytest
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from helpers.utils import available_gpus, init_distributed, rerun_if_address_is_in_use
from nanotron.optim.gradient_accumulator import FP32GradientAccumulator
from nanotron.optim.named_optimizer import NamedOptimizer
from nanotron.optim.optimizer_from_gradient_accumulator import OptimizerFromGradientAccumulator
from nanotron.parallel.context import ParallelContext
from nanotron.parallel.parameters import NanotronParameter
from nanotron.random import set_random_seed
class DummyModel(nn.Module):
def __init__(self, dtype=torch.float32):
super(DummyModel, self).__init__()
self.fc1 = nn.Linear(10, 20, bias=False).to(dtype=dtype)
self.fc2 = nn.Linear(20, 2, bias=False).to(dtype=dtype)
def forward(self, x):
x = F.relu(self.fc1(x))
x = F.relu(self.fc2(x))
return x
def test_optimizer_lr_one_group():
set_random_seed(42)
model = DummyModel().to("cuda")
lr1 = 0.1
named_params_or_groups = []
for name, param in model.named_parameters():
named_params_or_groups.append((name, param))
named_params_or_groups = [{"named_params": named_params_or_groups, "lr": lr1}]
optimizer = NamedOptimizer(
named_params_or_groups=named_params_or_groups,
optimizer_builder=lambda param_groups: optim.SGD(
param_groups,
lr=9999999, # this is a dummy value that should be overwritten by the lr in the named_params_or_groups
),
)
input = torch.randn(10, 10).to(device="cuda")
target = torch.randint(0, 2, (10,)).to(device="cuda")
for _ in range(100):
optimizer.zero_grad()
output = model(input)
loss = F.cross_entropy(output, target)
loss.backward()
fc1_grad = model.fc1.weight.grad.clone()
fc2_grad = model.fc2.weight.grad.clone()
# compute gradient manually
with torch.no_grad():
expected_fc1_weight = model.fc1.weight - lr1 * fc1_grad
expected_fc2_weight = model.fc2.weight - lr1 * fc2_grad
optimizer.step()
updated_fc1_weight = model.fc1.weight
updated_fc2_weight = model.fc2.weight
torch.testing.assert_close(expected_fc1_weight, updated_fc1_weight)
torch.testing.assert_close(expected_fc2_weight, updated_fc2_weight)
def test_optimizer_lr_multiple_group():
set_random_seed(42)
model = DummyModel().to("cuda")
lr1, lr2 = 0.1, 0.001
named_params_or_groups = [
{"named_params": [(name, param) for name, param in model.named_parameters() if "fc1" in name], "lr": lr1},
{"named_params": [(name, param) for name, param in model.named_parameters() if "fc2" in name], "lr": lr2},
]
optimizer = NamedOptimizer(
named_params_or_groups=named_params_or_groups,
optimizer_builder=lambda param_groups: optim.SGD(
param_groups,
lr=9999999, # this is a dummy value that should be overwritten by the lr in the named_params_or_groups
),
)
input = torch.randn(10, 10).to(device="cuda")
target = torch.randint(0, 2, (10,)).to(device="cuda")
for _ in range(100):
optimizer.zero_grad()
output = model(input)
loss = F.cross_entropy(output, target)
loss.backward()
fc1_grad = model.fc1.weight.grad.clone()
fc2_grad = model.fc2.weight.grad.clone()
with torch.no_grad():
expected_fc1_weight = model.fc1.weight - lr1 * fc1_grad
expected_fc2_weight = model.fc2.weight - lr2 * fc2_grad
optimizer.step()
updated_fc1_weight = model.fc1.weight
updated_fc2_weight = model.fc2.weight
torch.testing.assert_close(expected_fc1_weight, updated_fc1_weight)
torch.testing.assert_close(expected_fc2_weight, updated_fc2_weight)
def test_optimizer_lr_weight_decay_one_group():
set_random_seed(42)
model = DummyModel().to("cuda")
lr1 = 0.1
weight_decay = 0.1
named_params_or_groups = []
for name, param in model.named_parameters():
named_params_or_groups.append((name, param))
named_params_or_groups = [{"named_params": named_params_or_groups, "lr": lr1, "weight_decay": weight_decay}]
optimizer = NamedOptimizer(
named_params_or_groups=named_params_or_groups,
optimizer_builder=lambda param_groups: optim.SGD(
param_groups,
lr=9999999, # this is a dummy value that should be overwritten by the lr in the named_params_or_groups
),
)
input = torch.randn(10, 10).to(device="cuda")
target = torch.randint(0, 2, (10,)).to(device="cuda")
for _ in range(100):
optimizer.zero_grad()
output = model(input)
loss = F.cross_entropy(output, target)
loss.backward()
# Compute gradient manually and apply weight decay
with torch.no_grad():
expected_fc1_weight = (1 - lr1 * weight_decay) * model.fc1.weight - lr1 * model.fc1.weight.grad
expected_fc2_weight = (1 - lr1 * weight_decay) * model.fc2.weight - lr1 * model.fc2.weight.grad
optimizer.step()
updated_fc1_weight = model.fc1.weight
updated_fc2_weight = model.fc2.weight
torch.testing.assert_close(expected_fc1_weight, updated_fc1_weight)
torch.testing.assert_close(expected_fc2_weight, updated_fc2_weight)
def test_optimizer_lr_weight_decay_multiple_group():
set_random_seed(42)
model = DummyModel().to("cuda")
lr1, lr2 = 0.1, 0.001
weight_decay1, weight_decay2 = 0.1, 0.001
named_params_or_groups = [
{
"named_params": [(name, param) for name, param in model.named_parameters() if "fc1" in name],
"lr": lr1,
"weight_decay": weight_decay1,
},
{
"named_params": [(name, param) for name, param in model.named_parameters() if "fc2" in name],
"lr": lr2,
"weight_decay": weight_decay2,
},
]
optimizer = NamedOptimizer(
named_params_or_groups=named_params_or_groups,
optimizer_builder=lambda param_groups: optim.SGD(
param_groups,
lr=9999999, # this is a dummy value that should be overwritten by the lr in the named_params_or_groups
),
)
input = torch.randn(10, 10).to(device="cuda")
target = torch.randint(0, 2, (10,)).to(device="cuda")
for _ in range(100):
optimizer.zero_grad()
output = model(input)
loss = F.cross_entropy(output, target)
loss.backward()
# Compute gradient manually and apply weight decay
with torch.no_grad():
expected_fc1_weight = (1 - lr1 * weight_decay1) * model.fc1.weight - lr1 * model.fc1.weight.grad
expected_fc2_weight = (1 - lr2 * weight_decay2) * model.fc2.weight - lr2 * model.fc2.weight.grad
optimizer.step()
updated_fc1_weight = model.fc1.weight
updated_fc2_weight = model.fc2.weight
torch.testing.assert_close(expected_fc1_weight, updated_fc1_weight)
torch.testing.assert_close(expected_fc2_weight, updated_fc2_weight)
@pytest.mark.parametrize("half_precision", [torch.float16, torch.bfloat16])
@pytest.mark.parametrize("accumulation_steps", [1, 10])
def test_optimizer_grad_accumulation_lr_one_group(half_precision: torch.dtype, accumulation_steps: int):
set_random_seed(42)
dtype = half_precision
lr1 = 0.1
model = DummyModel(dtype=dtype).to("cuda")
# Need to convert the weights to NanotronParameter for the gradient accumulation to work
model.fc1.weight = NanotronParameter(model.fc1.weight)
model.fc2.weight = NanotronParameter(model.fc2.weight)
named_params_or_groups = []
for name, param in model.named_parameters():
named_params_or_groups.append((name, param))
named_params_or_groups = [{"named_params": named_params_or_groups, "lr": lr1}]
# Optimizer
def optimizer_builder(inp_param_groups):
return NamedOptimizer(
named_params_or_groups=inp_param_groups,
optimizer_builder=lambda param_groups: optim.SGD(
param_groups,
lr=9999999, # this is a dummy value that should be overwritten by the lr in the named_params_or_groups
),
)
optimizer = OptimizerFromGradientAccumulator(
gradient_accumulator_builder=lambda named_params: FP32GradientAccumulator(named_parameters=named_params),
named_params_or_groups=named_params_or_groups,
optimizer_builder=optimizer_builder,
)
accumulator = optimizer.gradient_accumulator
input = torch.randn(10, 10, dtype=dtype).to(device="cuda")
target = torch.randint(0, 2, (10,)).to(device="cuda")
for batch_idx in range(100):
optimizer.zero_grad()
output = model(input)
loss = F.cross_entropy(output.float(), target)
accumulator.backward(loss)
if (batch_idx + 1) % accumulation_steps == 0:
# Manual update weights for ref
with torch.no_grad():
fc1_grad = accumulator.get_grad_buffer(name="fc1.weight").to(dtype)
expected_fc1_weight = model.fc1.weight - lr1 * fc1_grad
fc2_grad = accumulator.get_grad_buffer(name="fc2.weight").to(dtype)
expected_fc2_weight = model.fc2.weight - lr1 * fc2_grad
optimizer.step()
updated_fc1_weight = model.fc1.weight
updated_fc2_weight = model.fc2.weight
torch.testing.assert_close(expected_fc1_weight, updated_fc1_weight)
torch.testing.assert_close(expected_fc2_weight, updated_fc2_weight)
@pytest.mark.parametrize("half_precision", [torch.float16, torch.bfloat16])
@pytest.mark.parametrize("accumulation_steps", [1, 10])
def test_optimizer_grad_accumulation_lr_multiple_group(half_precision: torch.dtype, accumulation_steps: int):
set_random_seed(42)
dtype = half_precision
lr1, lr2 = 0.1, 0.001
model = DummyModel(dtype=dtype).to("cuda")
# Need to convert the weights to NanotronParameter for the gradient accumulation to work
model.fc1.weight = NanotronParameter(model.fc1.weight)
model.fc2.weight = NanotronParameter(model.fc2.weight)
named_params_or_groups = [
{"named_params": [(name, param) for name, param in model.named_parameters() if "fc1" in name], "lr": lr1},
{"named_params": [(name, param) for name, param in model.named_parameters() if "fc2" in name], "lr": lr2},
]
# Optimizer
def optimizer_builder(inp_param_groups):
return NamedOptimizer(
named_params_or_groups=inp_param_groups,
optimizer_builder=lambda param_groups: optim.SGD(
param_groups,
lr=9999999, # this is a dummy value that should be overwritten by the lr in the named_params_or_groups
),
)
optimizer = OptimizerFromGradientAccumulator(
gradient_accumulator_builder=lambda named_params: FP32GradientAccumulator(named_parameters=named_params),
named_params_or_groups=named_params_or_groups,
optimizer_builder=optimizer_builder,
)
accumulator = optimizer.gradient_accumulator
input = torch.randn(10, 10, dtype=dtype).to(device="cuda")
target = torch.randint(0, 2, (10,)).to(device="cuda")
for batch_idx in range(100):
optimizer.zero_grad()
output = model(input)
loss = F.cross_entropy(output.float(), target)
accumulator.backward(loss)
if (batch_idx + 1) % accumulation_steps == 0:
# Manual update weights for ref
with torch.no_grad():
fc1_grad = accumulator.get_grad_buffer(name="fc1.weight").to(dtype)
expected_fc1_weight = model.fc1.weight - lr1 * fc1_grad
fc2_grad = accumulator.get_grad_buffer(name="fc2.weight").to(dtype)
expected_fc2_weight = model.fc2.weight - lr2 * fc2_grad
optimizer.step()
updated_fc1_weight = model.fc1.weight
updated_fc2_weight = model.fc2.weight
torch.testing.assert_close(expected_fc1_weight, updated_fc1_weight)
torch.testing.assert_close(expected_fc2_weight, updated_fc2_weight)
@pytest.mark.parametrize("half_precision", [torch.float16, torch.bfloat16])
@pytest.mark.parametrize("accumulation_steps", [1, 10])
def test_optimizer_grad_accumulation_lr_weight_decay_one_group(half_precision: torch.dtype, accumulation_steps: int):
set_random_seed(42)
dtype = half_precision
lr1 = 0.1
weight_decay = 0.1
model = DummyModel(dtype=dtype).to("cuda")
# Need to convert the weights to NanotronParameter for the gradient accumulation to work
model.fc1.weight = NanotronParameter(model.fc1.weight)
model.fc2.weight = NanotronParameter(model.fc2.weight)
named_params_or_groups = []
for name, param in model.named_parameters():
named_params_or_groups.append((name, param))
named_params_or_groups = [{"named_params": named_params_or_groups, "lr": lr1, "weight_decay": weight_decay}]
# Optimizer
def optimizer_builder(inp_param_groups):
return NamedOptimizer(
named_params_or_groups=inp_param_groups,
optimizer_builder=lambda param_groups: optim.SGD(
param_groups,
lr=9999999, # this is a dummy value that will be overwritten by the lr in the named_params_or_groups
weight_decay=9999999, # this is a dummy value that will be overwritten by the weight_decay in the named_params_or_groups
),
)
optimizer = OptimizerFromGradientAccumulator(
gradient_accumulator_builder=lambda named_params: FP32GradientAccumulator(named_parameters=named_params),
named_params_or_groups=named_params_or_groups,
optimizer_builder=optimizer_builder,
)
accumulator = optimizer.gradient_accumulator
input = torch.randn(10, 10, dtype=dtype).to(device="cuda")
target = torch.randint(0, 2, (10,)).to(device="cuda")
for batch_idx in range(100):
optimizer.zero_grad()
output = model(input)
loss = F.cross_entropy(output.float(), target)
accumulator.backward(loss)
if (batch_idx + 1) % accumulation_steps == 0:
# Manual update weights for ref
with torch.no_grad():
fc1_grad = accumulator.get_grad_buffer(name="fc1.weight").to(dtype)
expected_fc1_weight = (1 - lr1 * weight_decay) * model.fc1.weight - lr1 * fc1_grad
fc2_grad = accumulator.get_grad_buffer(name="fc2.weight").to(dtype)
expected_fc2_weight = (1 - lr1 * weight_decay) * model.fc2.weight - lr1 * fc2_grad
optimizer.step()
updated_fc1_weight = model.fc1.weight
updated_fc2_weight = model.fc2.weight
torch.testing.assert_close(expected_fc1_weight, updated_fc1_weight)
torch.testing.assert_close(expected_fc2_weight, updated_fc2_weight)
@pytest.mark.parametrize("half_precision", [torch.float16, torch.bfloat16])
@pytest.mark.parametrize("accumulation_steps", [1, 10])
def test_optimizer_grad_accumulation_lr_weight_decay_multiple_group(
half_precision: torch.dtype, accumulation_steps: int
):
set_random_seed(42)
dtype = half_precision
lr1, lr2 = 0.1, 0.001
weight_decay1, weight_decay2 = 0.1, 0.001
model = DummyModel(dtype=dtype).to("cuda")
# Need to convert the weights to NanotronParameter for the gradient accumulation to work
model.fc1.weight = NanotronParameter(model.fc1.weight)
model.fc2.weight = NanotronParameter(model.fc2.weight)
named_params_or_groups = [
{
"named_params": [(name, param) for name, param in model.named_parameters() if "fc1" in name],
"lr": lr1,
"weight_decay": weight_decay1,
},
{
"named_params": [(name, param) for name, param in model.named_parameters() if "fc2" in name],
"lr": lr2,
"weight_decay": weight_decay2,
},
]
# Optimizer
def optimizer_builder(inp_param_groups):
return NamedOptimizer(
named_params_or_groups=inp_param_groups,
optimizer_builder=lambda param_groups: optim.SGD(
param_groups,
lr=9999999, # this is a dummy value that will be overwritten by the lr in the named_params_or_groups
weight_decay=9999999, # this is a dummy value that will be overwritten by the weight_decay in the named_params_or_groups
),
)
optimizer = OptimizerFromGradientAccumulator(
gradient_accumulator_builder=lambda named_params: FP32GradientAccumulator(named_parameters=named_params),
named_params_or_groups=named_params_or_groups,
optimizer_builder=optimizer_builder,
)
accumulator = optimizer.gradient_accumulator
input = torch.randn(10, 10, dtype=dtype).to(device="cuda")
target = torch.randint(0, 2, (10,)).to(device="cuda")
for batch_idx in range(100):
optimizer.zero_grad()
output = model(input)
loss = F.cross_entropy(output.float(), target)
accumulator.backward(loss)
if (batch_idx + 1) % accumulation_steps == 0:
# Manual update weights for ref
with torch.no_grad():
fc1_grad = accumulator.get_grad_buffer(name="fc1.weight").to(dtype)
expected_fc1_weight = (1 - lr1 * weight_decay1) * model.fc1.weight - lr1 * fc1_grad
fc2_grad = accumulator.get_grad_buffer(name="fc2.weight").to(dtype)
expected_fc2_weight = (1 - lr2 * weight_decay2) * model.fc2.weight - lr2 * fc2_grad
optimizer.step()
updated_fc1_weight = model.fc1.weight
updated_fc2_weight = model.fc2.weight
torch.testing.assert_close(expected_fc1_weight, updated_fc1_weight)
torch.testing.assert_close(expected_fc2_weight, updated_fc2_weight)
@pytest.mark.skipif(available_gpus() < 2, reason="Testing requires at least 2 gpus")
@pytest.mark.parametrize("half_precision", [torch.float16, torch.bfloat16])
@pytest.mark.parametrize("accumulation_steps", [1, 10])
@rerun_if_address_is_in_use()
def test_ddp_optimizer_grad_accumulation_lr_weight_decay_multiple_group(
half_precision: torch.dtype, accumulation_steps: int
):
init_distributed(tp=1, dp=2, pp=1)(_test_ddp_optimizer_grad_accumulation_lr_weight_decay_multiple_group)(
half_precision=half_precision,
accumulation_steps=accumulation_steps,
)
def _test_ddp_optimizer_grad_accumulation_lr_weight_decay_multiple_group(
parallel_context: ParallelContext, half_precision: torch.dtype, accumulation_steps: int
):
set_random_seed(42)
dtype = half_precision
# Making it bigger so that the difference is more visible during update
lr1, lr2 = 0.04, 0.05
weight_decay1, weight_decay2 = 0.5, 0.2
model = DummyModel(dtype=dtype).to("cuda")
# Need to convert the weights to NanotronParameter for the gradient accumulation to work
model.fc1.weight = NanotronParameter(model.fc1.weight)
model.fc2.weight = NanotronParameter(model.fc2.weight)
model_ddp = torch.nn.parallel.DistributedDataParallel(
model,
process_group=parallel_context.dp_pg,
)
named_params_or_groups = [
{
"named_params": [(name, param) for name, param in model_ddp.named_parameters() if "fc1" in name],
"lr": lr1,
"weight_decay": weight_decay1,
},
{
"named_params": [(name, param) for name, param in model_ddp.named_parameters() if "fc2" in name],
"lr": lr2,
"weight_decay": weight_decay2,
},
]
# Optimizer
def optimizer_builder(inp_param_groups):
return NamedOptimizer(
named_params_or_groups=inp_param_groups,
optimizer_builder=lambda param_groups: optim.SGD(
param_groups,
lr=9999999, # this is a dummy value that will be overwritten by the lr in the named_params_or_groups
weight_decay=9999999, # this is a dummy value that will be overwritten by the weight_decay in the named_params_or_groups
),
)
optimizer = OptimizerFromGradientAccumulator(
gradient_accumulator_builder=lambda named_params: FP32GradientAccumulator(named_parameters=named_params),
named_params_or_groups=named_params_or_groups,
optimizer_builder=optimizer_builder,
)
accumulator = optimizer.gradient_accumulator
input = torch.randn(10, 10, dtype=dtype).to(device="cuda")
target = torch.randint(0, 2, (10,)).to(device="cuda")
for batch_idx in range(100):
optimizer.zero_grad()
output = model(input)
loss = F.cross_entropy(output.float(), target)
accumulator.backward(loss)
if (batch_idx + 1) % accumulation_steps == 0:
# Manual update weights for ref
with torch.no_grad():
fc1_grad = accumulator.get_grad_buffer(name="module.fc1.weight").to(dtype)
expected_fc1_weight = (1 - lr1 * weight_decay1) * model.fc1.weight - lr1 * fc1_grad
fc2_grad = accumulator.get_grad_buffer(name="module.fc2.weight").to(dtype)
expected_fc2_weight = (1 - lr2 * weight_decay2) * model.fc2.weight - lr2 * fc2_grad
optimizer.step()
updated_fc1_weight = model.fc1.weight
updated_fc2_weight = model.fc2.weight
torch.testing.assert_close(expected_fc1_weight, updated_fc1_weight)
torch.testing.assert_close(expected_fc2_weight, updated_fc2_weight)
import contextlib
import pytest
import torch
from helpers.exception import assert_fail_with
from helpers.utils import available_gpus, init_distributed, rerun_if_address_is_in_use
from nanotron import distributed as dist
from nanotron.parallel import ParallelContext
from nanotron.parallel.pipeline_parallel.p2p import P2P
@pytest.mark.skipif(available_gpus() < 2, reason="Testing test_ddp_with_afab requires at least 2 gpus")
@pytest.mark.parametrize("send_contiguous", [True, False])
@pytest.mark.parametrize("full", [True, False])
@rerun_if_address_is_in_use()
def test_check_send_recv_tensor(send_contiguous: bool, full: bool):
init_distributed(tp=1, dp=1, pp=2)(_test_check_send_recv_tensor)(send_contiguous=send_contiguous, full=full)
def _test_check_send_recv_tensor(parallel_context: ParallelContext, send_contiguous: bool, full: bool):
p2p = P2P(pg=parallel_context.pp_pg, device=torch.device("cuda"))
if dist.get_rank(p2p.pg) == 0:
tensor_to_send = torch.randn(3, 5, dtype=torch.float, device=torch.device("cuda"))
if send_contiguous is True:
assert tensor_to_send.is_contiguous()
else:
tensor_to_send = tensor_to_send.transpose(0, 1)
assert not tensor_to_send.is_contiguous()
# `full` defines if we take a non trivial slice of the tensor
if full is False:
tensor_to_send = tensor_to_send[1:3]
if send_contiguous is False and full is False:
# This is supposed to return a ValueError mentioning that you should have sent a smaller model by running `contiguous` before.
send_first_context = assert_fail_with(
AssertionError,
error_msg="Expect storage_size to be smaller than tensor size. It might not be true, when you use slicing for example though. We probably don't want to support it in our P2P system",
)
fail_at_first_send = True
else:
send_first_context = contextlib.nullcontext()
fail_at_first_send = False
# Send tensor back and forth through p2p protocol and check that we get the same thing.
if dist.get_rank(p2p.pg) == 0:
with send_first_context:
handles = p2p.isend_tensors([tensor_to_send], to_rank=1)
if fail_at_first_send is True:
# We early return if we caught an error
return
for handle in handles:
handle.wait()
tensor_travelled_back_and_forth = p2p.recv_tensors(1, from_rank=1)[0]
torch.testing.assert_close(tensor_to_send, tensor_travelled_back_and_forth, atol=0, rtol=0)
elif dist.get_rank(p2p.pg) == 1:
# Instead of letting first rank hang since sending won't be possible, we early return
tensors, handles = p2p.irecv_tensors(1, from_rank=0)
if fail_at_first_send is True:
return
for handle in handles:
handle.wait()
tensor_to_recv = tensors[0]
p2p.send_tensors([tensor_to_recv], to_rank=0)
else:
raise ValueError()
if full is False and send_contiguous is True:
# We can actually check that we haven't sent the entire storage as storage not accessed by the tensor are not sent
if dist.get_rank(p2p.pg) == 0:
# Check that the first element in the storages don't correspond (because they are not support to be communicated when the tensor is not full).
print(tensor_to_send.untyped_storage()[:4], tensor_travelled_back_and_forth.untyped_storage()[:4])
print(tensor_to_send.as_strided(size=(1,), stride=(1,), storage_offset=0))
print(tensor_travelled_back_and_forth.as_strided(size=(1,), stride=(1,), storage_offset=0))
assert not torch.allclose(
tensor_to_send.as_strided(size=(1,), stride=(1,), storage_offset=0),
tensor_travelled_back_and_forth.as_strided(size=(1,), stride=(1,), storage_offset=0),
)
parallel_context.destroy()
import torch
from helpers.exception import assert_fail_with
from nanotron.models.base import DTypeInvariantTensor, init_on_device_and_dtype
from nanotron.parallel.parameters import NanotronParameter
from torch import nn
def test_nanotron_parameter_does_not_override_some_parameter_variable():
param = nn.Parameter(torch.empty(3))
assert not hasattr(param, NanotronParameter.NANOTRON_PARAMETER_METADATA_ATTRIBUTE_NAME)
def test_uncastable_tensor():
# Test that we can create an DTypeInvariantTensor
x = DTypeInvariantTensor(torch.randn(3, 3))
assert isinstance(x, torch.Tensor)
assert isinstance(x, DTypeInvariantTensor)
# Test that we cannot modify the type of an DTypeInvariantTensor
with assert_fail_with(RuntimeError, error_msg="Cannot convert the type of an DTypeInvariantTensor to float"):
x = x.float()
with assert_fail_with(RuntimeError, error_msg="Cannot convert the type of an DTypeInvariantTensor to half"):
x = x.half()
with assert_fail_with(RuntimeError, error_msg="Cannot change the type of an DTypeInvariantTensor"):
x = x.to(torch.float32)
with assert_fail_with(RuntimeError, error_msg="Cannot change the type of an DTypeInvariantTensor"):
x = x.to(dtype=torch.float32)
# Test that we can modify the value of an DTypeInvariantTensor
x[0, 0] = 1
assert x[0, 0] == 1
# Test that we can modify the device of an DTypeInvariantTensor
x = x.to("cuda")
assert x.device.type == "cuda"
def test_register_buffer_does_not_update_uncastable_tensor():
old_device = torch.device("cuda")
old_dtype = torch.float32
new_device = torch.device("cpu")
new_dtype = torch.bfloat16
with init_on_device_and_dtype(device=new_device, dtype=new_dtype):
module = torch.nn.Module()
# Test that we can register an DTypeInvariantTensor as a buffer
tensor = DTypeInvariantTensor(torch.randn(3, 4, dtype=old_dtype, device=old_device))
module.register_buffer("buffer", tensor)
# Test that we can modify the buffer
module.buffer[0, 0] = 1
assert module.buffer[0, 0] == 1
# Test that device has been updated
assert module.buffer.device.type == new_device.type
# Test that dtype has not been modified
assert module.buffer.dtype is old_dtype
import copy
import nanotron.distributed as dist
import pytest
import torch
from helpers.dummy import DummyModel, dummy_infinite_data_loader
from helpers.exception import assert_fail_except_rank_with, timeout_after
from helpers.utils import available_gpus, init_distributed, rerun_if_address_is_in_use
from nanotron.models import init_on_device_and_dtype
from nanotron.optim import ZeroDistributedOptimizer
from nanotron.optim.gradient_accumulator import FP32GradBucketManager, FP32GradientAccumulator, get_fp32_accum_hook
from nanotron.optim.named_optimizer import NamedOptimizer
from nanotron.optim.optimizer_from_gradient_accumulator import (
OptimizerFromGradientAccumulator,
)
from nanotron.parallel import ParallelContext
from nanotron.parallel.parameters import NanotronParameter, sanity_check
from nanotron.parallel.pipeline_parallel.engine import (
AllForwardAllBackwardPipelineEngine,
OneForwardOneBackwardPipelineEngine,
PipelineEngine,
)
from nanotron.parallel.pipeline_parallel.p2p import P2P
from nanotron.parallel.pipeline_parallel.utils import get_pp_rank_of
from nanotron.parallel.tied_parameters import (
get_tied_id_to_param,
sync_tied_weights_gradients,
tie_parameters,
)
from nanotron.parallel.utils import initial_sync
from nanotron.sanity_checks import assert_tensor_synced_across_pg
from nanotron.utils import ContextManagers
from torch import nn
@pytest.mark.parametrize("half_precision", [torch.float16, torch.bfloat16])
def test_gradient_promoting_in_fp32(half_precision: torch.dtype):
model = nn.Linear(3, 2, bias=False, dtype=half_precision, device="cuda")
# Create Nanotron Parameter
model.weight = NanotronParameter(model.weight)
# Add gradient accumulator
accumulator = FP32GradientAccumulator(model.named_parameters())
# Check that our model is a valid model
sanity_check(model)
# Compute backward
input = torch.randn(5, 3, dtype=half_precision, device="cuda")
accumulator.backward(model(input).sum())
# Check that we have an high precision gradient and that the low precision one is cleared
assert accumulator.parameters["weight"]["fp32"].grad.dtype == torch.float
if model.weight.grad is not None:
# We check that it's zero
torch.testing.assert_close(model.weight.grad, torch.zeros_like(model.weight.grad), atol=1e-6, rtol=1e-7)
@pytest.mark.parametrize("half_precision", [torch.float16, torch.bfloat16])
def test_gradient_accumulated_in_fp32(half_precision: torch.dtype):
model = nn.Linear(3, 2, bias=False, dtype=half_precision, device="cuda")
ref_model = nn.Linear(3, 2, bias=False, dtype=half_precision, device="cuda")
with torch.inference_mode():
ref_model.weight.copy_(model.weight)
# Create Nanotron Parameter
model.weight = NanotronParameter(model.weight)
# Add gradient accumulator
accumulator = FP32GradientAccumulator(model.named_parameters())
# Check that our model is a valid model
sanity_check(model)
# Compute backward
grad_accumulation_steps = 2
for _ in range(grad_accumulation_steps):
# We want large input to have large gradients.
input = (torch.randn(5, 3, dtype=half_precision, device="cuda") ** 2 + 1) * 100
# Compute backwards
accumulator.backward(model(input).sum())
ref_model(input).sum().backward()
# We check that we get the same gradient accumulation. In theory we do get more precision by promoting gradients to fp32.
torch.testing.assert_close(
accumulator.parameters["weight"]["fp32"].grad.to(half_precision),
ref_model.weight.grad,
)
@pytest.mark.parametrize("half_precision", [torch.float16, torch.bfloat16])
def test_optimizer_can_step_gradient_in_fp32(half_precision: torch.dtype):
model = nn.Linear(3, 2, bias=False, dtype=half_precision, device="cuda")
original_weight = model.weight.detach().clone()
# Create Nanotron Parameter
model.weight = NanotronParameter(model.weight)
# Add optimizer
optimizer = OptimizerFromGradientAccumulator(
gradient_accumulator_builder=lambda named_params: FP32GradientAccumulator(named_parameters=named_params),
named_params_or_groups=model.named_parameters(),
optimizer_builder=lambda named_param_groups: NamedOptimizer(
named_params_or_groups=named_param_groups,
optimizer_builder=lambda param_groups: torch.optim.AdamW(param_groups),
),
)
accumulator = optimizer.gradient_accumulator
# Check that our model is a valid model
sanity_check(model)
# Compute backward
input = torch.randn(5, 3, dtype=half_precision, device="cuda")
accumulator.backward(model(input).sum())
# Check that we have an high precision gradient and that the low precision one is cleared
assert accumulator.parameters["weight"]["fp32"].grad.dtype == torch.float
if model.weight.grad is not None:
# We check that it's zero
torch.testing.assert_close(model.weight.grad, torch.zeros_like(model.weight.grad), atol=1e-6, rtol=1e-7)
optimizer.step()
optimizer.zero_grad()
# Check that we don't have gradients anymore and that it's set to `None`
assert accumulator.parameters["weight"]["fp32"].grad is None
assert model.weight.grad is None
# Check that gradients have been set to zero
fp32_grad = accumulator.get_grad_buffer(name="weight")
torch.testing.assert_close(fp32_grad, torch.zeros_like(fp32_grad), atol=1e-6, rtol=1e-7)
# weights has been updates
assert not torch.allclose(original_weight, model.weight)
@pytest.mark.skipif(available_gpus() < 2, reason="Testing ddp_hook_allreduce requires at least 2 gpus")
@pytest.mark.parametrize("half_precision", [torch.float16, torch.bfloat16])
@pytest.mark.parametrize("accumulation_steps", [1, 10])
@pytest.mark.parametrize("train_iterations", [1, 3])
@rerun_if_address_is_in_use()
def test_ddp_with_grad_accum_in_fp32(half_precision: torch.dtype, accumulation_steps: int, train_iterations: int):
init_distributed(tp=1, dp=2, pp=1)(_test_ddp_with_grad_accum_in_fp32)(
half_precision=half_precision,
accumulation_steps=accumulation_steps,
train_iterations=train_iterations,
)
def _test_ddp_with_grad_accum_in_fp32(
parallel_context: ParallelContext,
half_precision: torch.dtype,
accumulation_steps: int,
train_iterations: int,
):
hidden_size = 32
n_layers = 3
model = nn.Sequential(
nn.Linear(3, hidden_size, bias=False, dtype=half_precision, device="cuda"),
*(
nn.Linear(hidden_size, hidden_size, bias=False, dtype=half_precision, device="cuda")
for _ in range(n_layers - 1)
),
)
model_hook = copy.deepcopy(model)
# Create Nanotron Parameters
for module in model.modules():
if isinstance(module, nn.Linear):
setattr(module, "weight", NanotronParameter(module.weight))
for module in model_hook.modules():
if isinstance(module, nn.Linear):
setattr(module, "weight", NanotronParameter(module.weight))
# Needed in order to obtain smaller gradient buckets when using `DistributedDataParallel`
model_ddp = torch.nn.parallel.DistributedDataParallel(
model,
process_group=parallel_context.dp_pg,
) # we won't actually use DDP anywhere, it's just to have same module names
model_ddp_accum_ref = {}
model_ddp_fp32_accum = torch.nn.parallel.DistributedDataParallel(
model_hook,
process_group=parallel_context.dp_pg,
)
# Add gradient accumulator
accumulator = FP32GradientAccumulator(model_ddp_fp32_accum.named_parameters())
# Register DDP hook
state = FP32GradBucketManager(
dp_pg=parallel_context.dp_pg,
accumulator=accumulator,
param_id_to_name={id(param): name for name, param in model_ddp_fp32_accum.named_parameters()},
)
model_ddp_fp32_accum.register_comm_hook(
state=state,
hook=get_fp32_accum_hook(
reduce_scatter=False,
reduce_op=dist.ReduceOp.AVG,
),
)
for train_iter in range(train_iterations):
# Gradient accumulation steps
for accum_step in range(accumulation_steps - 1):
# Forward-Backward
input = torch.randn(10, 3, dtype=half_precision, device="cuda")
loss = model_ddp.module(input).sum()
assert not torch.isinf(loss).any(), "loss is inf"
loss.backward()
with ContextManagers([model_ddp_fp32_accum.no_sync(), accumulator.no_sync()]):
loss_fp32_accum = model_ddp_fp32_accum(input).sum()
accumulator.backward(loss_fp32_accum)
for name, param in model_ddp.named_parameters():
grad = param.grad
grad_fp32_accum = accumulator.parameters[name]["fp32"].grad
fp32_grad_bucket = accumulator.get_grad_buffer(name=name)
# Check that FP32GradAccum+DDP+hook gives close gradients to DDP
model_ddp_accum_ref[name] = (
grad.float() if accum_step == 0 else model_ddp_accum_ref[name] + grad.float()
)
dist.barrier()
torch.testing.assert_close(model_ddp_accum_ref[name], fp32_grad_bucket, atol=1e-6, rtol=1e-7)
dist.barrier()
# Check that we correctly copied grads from buckets to params (`copy_buckets_to_grads`)
torch.testing.assert_close(fp32_grad_bucket, grad_fp32_accum, atol=1e-6, rtol=1e-7)
# Check that the gradients are not synchronized across DP
with assert_fail_except_rank_with(AssertionError, rank_exception=0, pg=parallel_context.dp_pg):
assert_tensor_synced_across_pg(grad, parallel_context.dp_pg)
with assert_fail_except_rank_with(AssertionError, rank_exception=0, pg=parallel_context.dp_pg):
assert_tensor_synced_across_pg(fp32_grad_bucket, parallel_context.dp_pg)
# We zero out half grads for `model_ddp` because we're accumulating grads manually in `model_ddp_accum_ref`
model_ddp.zero_grad()
# Last accumulation step (Sync grads across DDP)
input = torch.randn(10, 3, dtype=half_precision, device="cuda")
loss = model_ddp.module(input).sum()
loss.backward()
# manually reduce grads across DDP
for name, param in model_ddp.named_parameters():
grad = param.grad
model_ddp_accum_ref[name] = (
model_ddp_accum_ref[name] + grad.float() if name in model_ddp_accum_ref else grad.float()
)
dist.all_reduce(model_ddp_accum_ref[name], group=parallel_context.dp_pg, op=dist.ReduceOp.AVG)
loss_fp32_accum = model_ddp_fp32_accum(input).sum()
accumulator.backward(loss_fp32_accum)
for name, param in model_ddp_fp32_accum.named_parameters():
# Check that half grads has been set to None in sync step, to avoid it being uncorrectly used
half_grad = param.grad
assert half_grad is None, f"{half_grad} != None"
grad = model_ddp_accum_ref[name]
grad_fp32_accum = accumulator.parameters[name]["fp32"].grad
fp32_grad_bucket = accumulator.get_grad_buffer(name=name)
# Check that FP32GradAccum+DDP+hook gives close gradients to DDP
dist.barrier()
torch.testing.assert_close(grad, fp32_grad_bucket, atol=1e-6, rtol=1e-7)
# Check that grad points to the same memory as the bucket
assert grad_fp32_accum.data_ptr() == fp32_grad_bucket.data_ptr()
# Check that the gradients are synchronized across DP
assert_tensor_synced_across_pg(grad, parallel_context.dp_pg)
assert_tensor_synced_across_pg(grad_fp32_accum, parallel_context.dp_pg)
# Zero out gradients (Usually it's the optimizer that does this)
model_ddp.zero_grad()
model_ddp_accum_ref = {}
accumulator.zero_grad() # Sets half grads to None and zeroes out fp32 grad buckets
for name, elt in accumulator.parameters.items():
fp32_param = elt["fp32"]
fp32_param.grad = None
# Check that fp32 grad buckets are zeroed out and `param.grad` is set to None
for name, param in model_ddp_fp32_accum.named_parameters():
assert param.grad is None
fp32_grad_bucket = accumulator.get_grad_buffer(name=name)
dist.barrier()
torch.testing.assert_close(fp32_grad_bucket, torch.zeros_like(fp32_grad_bucket), atol=1e-6, rtol=1e-7)
# Check that all fp32 grad buckets are zeroed out
for _, elt in accumulator.fp32_grad_buffers.items():
fp32_grad = elt["fp32_grad"]
# This is important as we assume grad buckets to be zeroed out at the first accumulation step
dist.barrier()
torch.testing.assert_close(fp32_grad, torch.zeros_like(fp32_grad), atol=1e-6, rtol=1e-7)
parallel_context.destroy()
@pytest.mark.skipif(
available_gpus() < 4, reason="Testing test_tied_weights_sync_with_grad_accum_in_fp32 requires at least 4 gpus"
)
@pytest.mark.parametrize(
"pipeline_engine", [AllForwardAllBackwardPipelineEngine(), OneForwardOneBackwardPipelineEngine()]
)
@pytest.mark.parametrize("reduce_scatter", [True, False])
@rerun_if_address_is_in_use()
def test_tied_weights_sync_with_grad_accum_in_fp32(pipeline_engine: PipelineEngine, reduce_scatter: bool):
init_distributed(tp=1, dp=2, pp=2)(_test_tied_weights_sync_with_grad_accum_in_fp32)(
pipeline_engine=pipeline_engine, reduce_scatter=reduce_scatter
)
def _test_tied_weights_sync_with_grad_accum_in_fp32(
parallel_context: ParallelContext, pipeline_engine: PipelineEngine, reduce_scatter: bool
):
# We init two replicas of 2 denses. Each dense is on a device.
dtype = torch.float16
device = torch.device("cuda")
p2p = P2P(pg=parallel_context.pp_pg, device=device)
model = DummyModel(p2p=p2p)
reference_model = DummyModel(p2p=p2p)
reference_model_accum_ref = {}
for mdl in [model, reference_model]:
# Set the ranks
with init_on_device_and_dtype(device, dtype):
assert parallel_context.pp_pg.size() == len(mdl.mlp)
for pp_rank, non_linear in zip(range(parallel_context.pp_pg.size()), mdl.mlp):
non_linear.linear.build_and_set_rank(pp_rank=pp_rank)
non_linear.activation.build_and_set_rank(pp_rank=pp_rank)
mdl.loss.build_and_set_rank(pp_rank=parallel_context.pp_pg.size() - 1)
# Tie all dense weights across PP
tie_parameters(
root_module=mdl,
ties=[
(
target,
(
parallel_context.get_global_rank(
ep_rank=dist.get_rank(parallel_context.expert_pg),
pp_rank=get_pp_rank_of(target, module=mdl),
dp_rank=dist.get_rank(parallel_context.dp_pg),
tp_rank=dist.get_rank(parallel_context.tp_pg),
),
),
)
for target in [
f"mlp.{pp_rank}.linear.pp_block.weight" for pp_rank in range(parallel_context.pp_pg.size())
]
],
parallel_context=parallel_context,
reduce_op=dist.ReduceOp.SUM,
)
for name, module in mdl.named_modules():
if isinstance(module, nn.Linear):
module.bias = NanotronParameter(module.bias)
# Sync DP and tied weights: basic assumption
initial_sync(model=mdl, parallel_context=parallel_context)
# Sync params between `model` and `reference_model`
with torch.no_grad():
for name, param in model.named_parameters():
param.copy_(reference_model.get_parameter(name))
# DDP
model_ddp = torch.nn.parallel.DistributedDataParallel(model, process_group=parallel_context.dp_pg)
module_id_to_prefix = {id(module): f"{module_name}." for module_name, module in model.named_modules()}
reference_module_id_to_prefix = {
id(module): f"{module_name}." for module_name, module in reference_model.named_modules()
}
# Fix the root_model
module_id_to_prefix[id(model)] = ""
reference_module_id_to_prefix[id(reference_model)] = ""
# named parameters
named_parameters = [
(
param.get_tied_info().get_full_name_from_module_id_to_prefix(module_id_to_prefix=module_id_to_prefix)
if param.is_tied
else name,
param,
)
for name, param in model.named_parameters()
]
# Optimizer: We don't actually run the optimizer, we just use it to build the gradient accumulator
optimizer = ZeroDistributedOptimizer(
dp_pg=parallel_context.dp_pg,
named_params_or_groups=named_parameters,
optimizer_builder=lambda named_param_groups_1: OptimizerFromGradientAccumulator(
gradient_accumulator_builder=lambda named_params: FP32GradientAccumulator(
named_parameters=named_params,
grad_buckets_named_params=named_parameters,
),
named_params_or_groups=named_param_groups_1,
optimizer_builder=lambda named_param_groups_2: NamedOptimizer(
named_params_or_groups=named_param_groups_2,
optimizer_builder=lambda param_groups: torch.optim.AdamW(param_groups),
),
),
)
param_id_to_name = {
id(param): param.get_tied_info().get_full_name_from_module_id_to_prefix(
module_id_to_prefix=module_id_to_prefix
)
if param.is_tied
else name
for name, param in model.named_parameters()
}
# Add gradient accumulator
# We use `model_ddp.module` in order ta have the parameter names without the `module.` prefix
accumulator = optimizer.optimizer.gradient_accumulator
accumulator.assign_param_offsets(
dp_rank=dist.get_rank(parallel_context.dp_pg),
param_name_to_offsets=optimizer.param_name_to_dp_rank_offsets,
)
model_ddp.register_comm_hook(
state=FP32GradBucketManager(
dp_pg=parallel_context.dp_pg,
accumulator=accumulator,
param_id_to_name=param_id_to_name,
),
hook=get_fp32_accum_hook(reduce_scatter=reduce_scatter, reduce_op=dist.ReduceOp.AVG),
)
# Get infinite dummy data iterator
data_iterator = dummy_infinite_data_loader(pp_pg=parallel_context.pp_pg, dtype=dtype) # First rank receives data
n_micro_batches_per_batch = 2
batch = [next(data_iterator) for _ in range(n_micro_batches_per_batch)]
## Reference model iteration step
def forward_backward_reference(mdl, micro_batch):
pipeline_engine.train_batch_iter(
mdl, pg=parallel_context.pp_pg, batch=[micro_batch], nb_microbatches=1, grad_accumulator=None
)
for accum_step in range(n_micro_batches_per_batch - 1):
# Forward-Backward
forward_backward_reference(reference_model, batch[accum_step])
# Accumulate grads
for name, param in reference_model.named_parameters():
grad = param.grad
if param.is_tied:
tied_info = param.get_tied_info()
name = tied_info.get_full_name_from_module_id_to_prefix(
module_id_to_prefix=reference_module_id_to_prefix
)
reference_model_accum_ref[name] = (
grad.float() if accum_step == 0 else reference_model_accum_ref[name] + grad.float()
)
# We zero out half grads for `reference_model` because we're accumulating grads manually in `reference_model_accum_ref`
reference_model.zero_grad()
# Last accumulation step (Sync grads across DDP)
forward_backward_reference(reference_model, batch[-1])
# manually reduce grads across DDP
for name, param in reference_model.named_parameters():
grad = param.grad
if param.is_tied:
tied_info = param.get_tied_info()
name = tied_info.get_full_name_from_module_id_to_prefix(module_id_to_prefix=reference_module_id_to_prefix)
reference_model_accum_ref[name] = (
reference_model_accum_ref[name] + grad.float() if name in reference_model_accum_ref else grad.float()
)
dist.all_reduce(reference_model_accum_ref[name], group=parallel_context.dp_pg, op=dist.ReduceOp.AVG)
## Model iteration step
pipeline_engine.train_batch_iter(
model_ddp,
pg=parallel_context.pp_pg,
batch=batch,
nb_microbatches=n_micro_batches_per_batch,
grad_accumulator=accumulator,
)
for name, param in model_ddp.module.named_parameters():
if param.is_tied:
tied_info = param.get_tied_info()
name = tied_info.get_full_name_from_module_id_to_prefix(module_id_to_prefix=module_id_to_prefix)
# Each parameter is sharded across DP.
assert (
name in accumulator.parameters
), f"`accumulator.parameters` must have all params {name} not in `accumulator.parameters`. Existing keys are: {accumulator.parameters}"
fp32_grad = accumulator.get_grad_buffer(name=name)
if not reduce_scatter:
# Check that the gradients are synchronized across DP
assert_tensor_synced_across_pg(fp32_grad, parallel_context.dp_pg)
fp32_grad_ref = reference_model_accum_ref[name]
dist.barrier()
if reduce_scatter:
slice_ = slice(*accumulator.param_name_to_offsets[name])
# Check that gradients are correct
torch.testing.assert_close(
fp32_grad_ref.view(-1)[slice_] / n_micro_batches_per_batch,
fp32_grad.view(-1)[slice_],
rtol=1e-7,
atol=1e-6,
msg=lambda msg: f"FP32 Gradients at `{name}` don't match\n - Expected: {fp32_grad_ref.view(-1)[slice_] / n_micro_batches_per_batch}\n - Got: {fp32_grad.view(-1)[slice_]}",
)
else:
# Check that gradients are correct
torch.testing.assert_close(fp32_grad_ref / n_micro_batches_per_batch, fp32_grad, rtol=1e-7, atol=1e-6)
# Check that tied weights grads are not synchronized yet
for (name, group_ranks), param in sorted(
get_tied_id_to_param(parameters=model_ddp.parameters(), root_module=model_ddp.module).items(),
key=lambda x: x[0],
):
if not (isinstance(param, NanotronParameter) and param.is_tied):
continue
group = parallel_context.world_ranks_to_pg[group_ranks]
fp32_grad = accumulator.get_grad_buffer(name=name)
with assert_fail_except_rank_with(AssertionError, rank_exception=0, pg=group):
assert_tensor_synced_across_pg(
tensor=fp32_grad,
pg=group,
msg=lambda err: f"Tied weights's grads {name} are not synchronized. {err}",
)
# Sync tied weights grads (e.g. sync dense1 and dense2 grads in DP=0, but the problem is that DP=0 has only optim states for dense1)
# - Translate tied ranks along DP axis to find the DP rank that has the tied weights
# - accumulator keeps grads for all DPs, so we can just sync the grads
with timeout_after():
sync_tied_weights_gradients(
module=model_ddp.module, parallel_context=parallel_context, grad_accumulator=accumulator
)
tied_infos_dict = {
(
param.get_tied_info().get_full_name_from_module_id_to_prefix(module_id_to_prefix=module_id_to_prefix),
param.get_tied_info().global_ranks,
param.get_tied_info().reduce_op,
): param
for name, param in model_ddp.module.named_parameters()
if param.is_tied
}
# Check that tied weights grads are synchronized
for (name, group_ranks, reduce_op), param in sorted(tied_infos_dict.items(), key=lambda x: x[0]):
# Make sure we don't get None for reduce_op
assert reduce_op == dist.ReduceOp.SUM
fp32_grad_buffer = accumulator.get_grad_buffer(name=name)
# Grad buffers are only attached to param.grad on ranks that are sharded depending on `param_to_dprank`
fp32_grad = accumulator.parameters[name]["fp32"].grad
# Tied weights are synced using the fp32 grad buffers. Let's make sure they still point to the same memory
# When using ZeRODistributedOptimizer gradients are slices across dp
dp_slice_fp_32_grad_buffer = fp32_grad_buffer.view(-1)[slice(*accumulator.param_name_to_offsets[name])]
assert (
dp_slice_fp_32_grad_buffer.data_ptr() == fp32_grad.data_ptr()
), "dp_slice_fp_32_grad_buffer and fp32_grad should point to the same memory"
group = parallel_context.world_ranks_to_pg[group_ranks]
# Check that fp32 grads for tied weights are synced (Used in optimizer step)
# Since we use `reduce_scatter = False` the entire gradient buffer is all reduced, causing it to be synced
if reduce_scatter:
assert_tensor_synced_across_pg(
tensor=dp_slice_fp_32_grad_buffer,
pg=group,
msg=lambda err: f"Tied weights's fp32 grads {name} are not synchronized. {err}",
)
else:
assert_tensor_synced_across_pg(
tensor=fp32_grad_buffer,
pg=group,
msg=lambda err: f"Tied weights's fp32 grads {name} are not synchronized. {err}",
)
# Manually sync reference model's tied weights grads
dist.all_reduce(reference_model_accum_ref[name], group=group, op=reduce_op)
# Check that accumulated grads are correct
for name, elt in accumulator.fp32_grad_buffers.items():
fp32_grad = elt["fp32_grad"]
dist.barrier()
if reduce_scatter:
slice_ = slice(*accumulator.param_name_to_offsets[name])
torch.testing.assert_close(
reference_model_accum_ref[name].view(-1)[slice_] / n_micro_batches_per_batch,
fp32_grad.view(-1)[slice_],
atol=1e-6,
rtol=1e-7,
msg=lambda msg: f"Grad for {name} is not correct.\n{msg}",
)
else:
torch.testing.assert_close(
reference_model_accum_ref[name] / n_micro_batches_per_batch,
fp32_grad,
atol=1e-6,
rtol=1e-7,
msg=lambda msg: f"Grad for {name} is not correct.\n{msg}",
)
parallel_context.destroy()
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment