Commit e4575be9 authored by huaerkl's avatar huaerkl
Browse files

v1.0

parents
import random
import unittest
import torch
from torch.nn import functional as F
from megatron.model.glu_activations import GLU_ACTIVATIONS, geglu, liglu, reglu, swiglu
from megatron.testing_utils import set_seed, torch_assert_equal
class TestActivations(unittest.TestCase):
def setUp(self):
"""setup an input of reasonable size"""
set_seed()
self.batch_size = random.randint(2, 64)
self.seq_len = random.randint(256, 1025)
self.num_channels = random.randint(1, 384) * 2
self.x = torch.randn(self.batch_size, self.seq_len, self.num_channels)
self.x1, self.x2 = self.x.chunk(2, dim=-1)
# glu should halve the last dimension
self.output_shape = [self.batch_size, self.seq_len, self.num_channels // 2]
def test_shapes(self):
for activation_fn in GLU_ACTIVATIONS.values():
output = activation_fn(self.x)
self.assertEqual(list(output.shape), self.output_shape)
def test_liglu(self):
expected = self.x1 * self.x2
torch_assert_equal(liglu(self.x), expected)
def test_geglu(self):
expected = self.x1 * F.gelu(self.x2)
torch_assert_equal(geglu(self.x), expected)
def test_reglu(self):
expected = self.x1 * F.relu(self.x2)
torch_assert_equal(reglu(self.x), expected)
def test_swiglu(self):
expected = self.x1 * F.silu(self.x2)
torch_assert_equal(swiglu(self.x), expected)
# from megatron.testing_utils import require_torch_bf16
# @require_torch_bf16
# def test_bf16_jit(self):
# x_bf16 = self.x.to(torch.bfloat16)
# for activation_fn in GLU_ACTIVATIONS.values():
# output = activation_fn(x_bf16)
# self.assertEqual(list(output.shape), self.output_shape)
def test_import():
import megatron
# Copyright 2020 The HuggingFace Team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import io
import os
import pytest
from pathlib import Path
from parameterized import parameterized
from megatron.testing_utils import (
CaptureStdout,
TestCasePlus,
execute_subprocess_async,
get_gpu_count,
require_deepspeed,
require_torch_gpu,
require_torch_multi_gpu,
set_seed
)
set_seed(42)
def parameterized_custom_name_func(func, param_num, param):
# customize the test name generator function as we want both params to appear in the sub-test
# name, as by default it shows only the first param
param_based_name = parameterized.to_safe_name("_to_".join(str(x) for x in param.args))
return f"{func.__name__}_{param_based_name}"
params = [
# TP_PP_DP
["1_1_1", "1_1_1"],
["2_1_1", "1_1_1"],
["1_2_1", "1_1_1"],
["1_1_2", "1_1_1"],
["2_1_1", "2_1_1"],
["1_1_1", "2_1_1"],
["1_1_1", "1_2_1"],
["1_1_1", "1_1_2"],
["1_1_2", "1_1_2"],
["1_1_2", "2_1_1"],
["1_1_2", "1_2_1"],
["1_2_1", "1_2_1"],
["1_2_1", "2_1_1"],
["1_2_1", "1_1_2"],
["2_1_1", "2_1_1"],
["2_1_1", "1_2_1"],
["2_1_1", "1_1_2"],
["2_2_2", "1_1_1"],
["2_2_2", "2_2_2"],
["1_1_1", "2_2_2"],
["1_1_8", "2_2_2"],
]
def get_launcher(num_gpus):
# 1. explicitly set --num_nodes=1 just in case these tests end up run on a multi-node setup
# - it won't be able to handle that
return f"deepspeed --num_nodes 1 --num_gpus {num_gpus}".split()
@require_deepspeed
@require_torch_gpu
class MegDSTestCheckpoints(TestCasePlus):
""" """
def setUp(self):
super().setUp()
# at times magatron fails to build kernels and doesn't remove the lock file, which makes
# subsequent runs hang - so make sure there is no lock when starting the testing
meg_lock_file_path = self.repo_root_dir_str + "/megatron/fused_kernels/build/lock"
if os.path.exists(meg_lock_file_path):
os.unlink(meg_lock_file_path)
def get_config(self, output_dir, tp_size, pp_size, dp_size):
data_dir = f"{self.data_dir}/gpt2"
num_gpus = pp_size * tp_size * dp_size
print(f"Using {num_gpus} GPUs")
n_samples = 300 # about 56 iterations
exit_interval = 20 # some samples in the first half and then some more in the 2nd half after resume
seq_len = 128
# XXX: for now while testing shapes make it really short and fast
exit_interval = 1
seq_len = 8
# common/shared configs
ds_args = f"""
--deepspeed
--deepspeed_config {self.test_file_dir_str}/ds_config_bf16.json
--zero-stage 0
--deepspeed-activation-checkpointing
""".split()
args = f"""
--tensor-model-parallel-size {tp_size}
--pipeline-model-parallel-size {pp_size}
--distributed-backend nccl
--log-interval 1
--save-interval 1
--eval-interval 10
--eval-iters 1
--checkpoint-activations
--partition-activations
--exit-interval {exit_interval}
--merge-file {data_dir}/gpt2-tiny-merges.txt
--vocab-file {data_dir}/gpt2-tiny-vocab.json
--save {output_dir}/checkpoints
--load {output_dir}/checkpoints
--data-path {data_dir}/meg-gpt2-openwebtext_text_document
--tensorboard-dir {output_dir}/tensorboard
--tensorboard-queue-size 5
--log-timers-to-tensorboard
--log-batch-size-to-tensorboard
--log-validation-ppl-to-tensorboard
--num-layers 2
--hidden-size 8
--num-attention-heads 2
--seq-length {seq_len}
--max-position-embeddings 8
--micro-batch-size 1
--global-batch-size 16
--train-samples {n_samples}
--embed-layernorm
--position-embedding-type alibi
--optimizer adam
--adam-beta1 0.9
--adam-beta2 0.95
--adam-eps 1e-8
--lr 1e-4
--lr-warmup-samples 5
--lr-decay-samples 6
--clip-grad 1.0
--weight-decay 1e-1
--bf16
--log-level debug
--log-level-replica info
""".split()
# XXX: fails to handle:
#--embed-layernorm
#
# stderr: RuntimeError: Error(s) in loading state_dict for VocabParallelEmbedding:
# stderr: size mismatch for norm.weight: copying a param with shape torch.Size([128]) from checkpoint, the shape in current model is torch.Size([64]).
# stderr: size mismatch for norm.bias: copying a param with shape torch.Size([128]) from checkpoint, the shape in current model is torch.Size([64]).
return args, ds_args, num_gpus
def train_checkpoint(self, output_dir, tp_size=1, pp_size=1, dp_size=1):
src_dir = self.src_dir
script = [f"{src_dir}/pretrain_gpt.py"]
args, ds_args, num_gpus = self.get_config(output_dir, tp_size, pp_size, dp_size)
launcher = get_launcher(num_gpus)
cmd = launcher + script + args + ds_args
# keep for quick debug
#print(" ".join([f"\nPYTHONPATH={self.src_dir_str}"] +cmd)); die
# 1. test training from scratch (no checkpoint)
with CaptureStdout() as cs:
execute_subprocess_async(cmd, env=self.get_env())
# test deepspeed is running
self.assertIn("DeepSpeed info", cs.out)
# test reports
self.assertIn("consumed samples", cs.out)
# test there should be no checkpoint this round
self.assertIn(f"Unable to find latest file at {output_dir}/checkpoints/latest", cs.out)
# test checkpoint saving
self.assertIn("successfully saved checkpoint at iteration", cs.out)
def convert_checkpoint_to_universal(self, output_dir, step):
cmd = f"""
python tools/convert_checkpoint/ds_to_universal.py
--input_folder {output_dir}/checkpoints/global_step{step}
--output_folder {output_dir}/checkpoints/global_step{step}_universal
""".split()
# keep for quick debug
# print(" ".join([f"\nPYTHONPATH={self.src_dir_str}"] +cmd)); die
with CaptureStdout() as cs:
execute_subprocess_async(cmd, env=self.get_env())
self.assertIn("Convert DeepSpeed Checkpoint to Universal Checkpoint", cs.out)
def resume_from_checkpoint(self, output_dir, tp_size=1, pp_size=1, dp_size=1):
src_dir = self.src_dir
script = [f"{src_dir}/pretrain_gpt.py"]
args, ds_args, num_gpus = self.get_config(output_dir, tp_size, pp_size, dp_size)
launcher = get_launcher(num_gpus)
cmd = launcher + script + args + ds_args
# keep for quick debug
# print(" ".join([f"\nPYTHONPATH={self.src_dir_str}"] +cmd)); die
with CaptureStdout() as cs:
execute_subprocess_async(cmd, env=self.get_env())
# test checkpoint loading
self.assertIn(f"successfully loaded checkpoint from {output_dir}/checkpoints", cs.out)
# test reports
self.assertIn("consumed samples", cs.out)
# test checkpoint saving
self.assertIn("successfully saved checkpoint at iteration", cs.out)
def resume_from_universal_checkpoint(self, output_dir, tp_size=1, pp_size=1, dp_size=1):
src_dir = self.src_dir
script = [f"{src_dir}/pretrain_gpt.py"]
args, ds_args, num_gpus = self.get_config(output_dir, tp_size, pp_size, dp_size)
launcher = get_launcher(num_gpus)
cmd = launcher + script + args + ds_args + ["--universal-checkpoint"]
# keep for quick debug
#print(" ".join([f"\nPYTHONPATH={self.src_dir_str}"] +cmd)); die
with CaptureStdout() as cs:
execute_subprocess_async(cmd, env=self.get_env())
# test checkpoint loading
self.assertIn(f"successfully loaded checkpoint from {output_dir}/checkpoints", cs.out)
# test reports
self.assertIn("consumed samples", cs.out)
# test checkpoint saving
self.assertIn("successfully saved checkpoint at iteration", cs.out)
@require_torch_multi_gpu
@parameterized.expand(params, name_func=parameterized_custom_name_func)
def test_checkpoint_reshaping_main(self, src, tgt):
# this test needs at least 2 gpus - if there are more gpus it will do more extensive testing
tp_size_src, pp_size_src, dp_size_src = list(map(int, src.split('_')))
tp_size_tgt, pp_size_tgt, dp_size_tgt = list(map(int, tgt.split('_')))
n_gpus = get_gpu_count()
n_gpus_src = tp_size_src * pp_size_src * dp_size_src
n_gpus_tgt = tp_size_tgt * pp_size_tgt * dp_size_tgt
if n_gpus_src > n_gpus:
pytest.skip(f"the test requires {n_gpus_src} gpus for source topology but have only {n_gpus}")
if n_gpus_tgt > n_gpus:
pytest.skip(f"the test requires {n_gpus_tgt} gpus for target topology but have only {n_gpus}")
output_dir = self.get_auto_remove_tmp_dir("./xxx", after=False)
# 1. train with initial topology defined in the first arg of params
self.train_checkpoint(output_dir, tp_size=tp_size_src , pp_size=pp_size_src , dp_size=dp_size_src )
# 2. convert checkpoint to universal checkpoint (topology )
self.convert_checkpoint_to_universal(output_dir=output_dir, step=1)
# 3. check we can resume training from a reshaped checkpoint to the target topology - the last arg of params
self.resume_from_universal_checkpoint(output_dir, tp_size=tp_size_tgt, pp_size=pp_size_tgt, dp_size=dp_size_tgt)
@require_torch_multi_gpu
def test_checkpoint_reshaping_empty_dir(self):
output_dir = self.get_auto_remove_tmp_dir() # "./xxx", after=False)
with self.assertRaises(RuntimeError) as context:
self.convert_checkpoint_to_universal(output_dir=output_dir, step=1)
import itertools
import os
import shutil
from typing import Set
from unittest.mock import patch
import deepspeed
import torch
import finetune_t0_non_causal_decoder
from megatron import global_vars, get_tokenizer, initialize_megatron, get_args
from megatron.data import mlm_dataset, mtf_dataset, decoder_packed_mtf_dataset
from megatron.data.data_samplers import build_pretraining_data_loader
from megatron.testing_utils import TestCasePlus, flatten_arguments, mockenv_context, torch_assert_equal
def get_default_args():
"""return a dictionary with key as argument name and value as additional arguments"""
return {
# GPT_ARGS
"--num-layers": "2",
"--hidden-size": "128",
"--num-attention-heads": "4",
"--seq-length": "512",
"--max-position-embeddings": "512",
"--micro-batch-size": "4",
"--global-batch-size": "8",
"--lr-decay-iters": "320000",
"--lr-decay-style": "cosine",
"--lr": "0.00015",
"--min-lr": "1.0e-5",
"--train-iters": "5000",
"--tokenizer-type": "PretrainedFromHF",
"--tokenizer-name-or-path": "gpt2",
"--data-impl": "mmap",
"--split": "949,50,1",
"--distributed-backend": "nccl",
"--weight-decay": "1e-2",
"--clip-grad": "1.0",
"--lr-warmup-fraction": ".01",
"--fp16": "",
"--attention-dropout": "0",
"--hidden-dropout": "0",
# OUTPUT_ARGS
"--log-interval": "10",
"--save-interval": "500",
"--eval-interval": "100",
"--eval-iters": "10",
"--checkpoint-activations": "",
# DATA_ARGS
}
def get_dummy_mtf_decoder_packed_data(micro_batch_size: int, seq_length: int, vocab_size: int, special_tokens_ids: Set[int]):
seq_length += 1
num_segments = torch.randint(1, 5, ())
segment_ids = torch.zeros(micro_batch_size, seq_length, dtype=torch.long)
is_inputs = torch.zeros(micro_batch_size, seq_length, dtype=torch.bool)
for batch_id in range(micro_batch_size):
# - `*2`: Hack in order to two start_new_segements to be seperated with two tokens at least
# - `+1`: Hack in order the start_mew_segments not to be 0
start_new_segments = torch.sort(torch.randperm((seq_length - 2) // 2, )[:num_segments]).values * 2 + 1
segment_ids[batch_id, start_new_segments] = 1
end_inputs = [
torch.randint(low=start_segment, high=end_segment, size=())
for start_segment, end_segment in zip([0, *start_new_segments], [*start_new_segments, seq_length])
]
for end_input, start_segment in zip(end_inputs, [0, *start_new_segments]):
is_inputs[batch_id][start_segment: end_input + 1] = True
segment_ids = torch.cumsum(segment_ids, dim=-1) + 1
tokens = torch.randint(high=vocab_size, size=(micro_batch_size, seq_length), dtype=torch.long)
flatten_token_view = tokens.view(-1,)
for token_id in range(len(flatten_token_view)):
token = flatten_token_view[token_id]
# While token is a special tokens we change that token
while token in special_tokens_ids:
flatten_token_view[token_id] = (token + 1) % vocab_size
token = flatten_token_view[token_id]
return {
"decoder_token_ids": tokens,
"decoder_segment_ids": segment_ids,
"decoder_is_inputs": is_inputs
}
class TestDataLoading(TestCasePlus):
def setUp(self) -> None:
super().setUp()
# We reset all global variables
global_vars._GLOBAL_ARGS = None
global_vars._GLOBAL_NUM_MICROBATCHES_CALCULATOR = None
global_vars._GLOBAL_TOKENIZER = None
global_vars._GLOBAL_TENSORBOARD_WRITER = None
global_vars._GLOBAL_ADLR_AUTORESUME = None
global_vars._GLOBAL_TIMERS = None
self.dist_env_1_gpu = dict(
MASTER_ADDR="localhost", MASTER_PORT="9994", RANK="0", LOCAL_RANK="0", WORLD_SIZE="1"
)
def copy_data_to_temp(self, root_dir, prefix):
"""copy data to temp, and return paths to temp version"""
src_path = os.path.join(root_dir, prefix)
src_dirname = os.path.dirname(src_path)
tmp_dir = self.get_auto_remove_tmp_dir()
dest_path = os.path.join(tmp_dir, prefix)
dest_dirname = os.path.dirname(dest_path)
os.makedirs(dest_dirname, exist_ok=True)
for folder in os.listdir(src_dirname):
src_folder = os.path.join(src_dirname, folder)
dest_folder = os.path.join(dest_dirname, folder)
if src_folder.startswith(src_path):
if os.path.isdir(src_folder):
shutil.copytree(src_folder, dest_folder)
else:
shutil.copy2(src_folder, dest_folder)
return dest_path
def test_mlm_dataset(self):
command_args = get_default_args()
data_path = self.copy_data_to_temp(self.data_dir, "gpt2/meg-gpt2-openwebtext_text_document")
command_args["--data-path"] = data_path
command_args["--noise-density"] = "0.15"
command_args["--mean-noise-span-length"] = "3"
command_args["--vocab-extra-ids"] = "100"
with patch('sys.argv', flatten_arguments(command_args)):
with mockenv_context(**self.dist_env_1_gpu):
deepspeed.init_distributed()
initialize_megatron()
# tokenizer
tokenizer = get_tokenizer()
# SEP is required to put in MLM preprocessed.
tokenizer.tokenizer.add_special_tokens({"sep_token": "<s>"})
args = get_args()
train_val_test_num_samples = [
args.train_iters * args.global_batch_size,
args.eval_iters * args.global_batch_size,
0
]
train_ds, valid_ds, test_ds = mlm_dataset.build_train_valid_test_datasets(
data_prefix=args.data_path,
data_impl=args.data_impl,
splits_string=args.split,
# TODO @thomasw21 figure how that value works
train_valid_test_num_samples=train_val_test_num_samples,
sequence_length=args.seq_length,
noise_density=args.noise_density,
mean_noise_span_length=args.mean_noise_span_length,
seed=args.seed,
skip_warmup=(not args.mmap_warmup)
)
sample = train_ds[0]
# +1 is needed to compute labels. As inputs and targets are just concatenated.
self.assertEqual(len(sample["input_tokens"]) + len(sample["target_tokens"]), args.seq_length + 1)
# We make sure that inputs/targets end with <sep>
self.assertEqual(sample["input_tokens"][-1], tokenizer.sep)
self.assertEqual(sample["target_tokens"][-1], tokenizer.sep)
def test_decoder_packed_mtf_dataloader(self):
command_args = get_default_args()
data_path = self.copy_data_to_temp(self.data_dir, "gpt2/ag_news_prompt")
command_args["--data-path"] = data_path
with patch('sys.argv', flatten_arguments(command_args)):
with mockenv_context(**self.dist_env_1_gpu):
deepspeed.init_distributed()
initialize_megatron()
args = get_args()
tokenizer = get_tokenizer()
# Hack: `gpt2` doesn't have a padding token, so we override that value.
tokenizer.tokenizer.pad_token_id = tokenizer.tokenizer.eos_token_id
train_val_test_num_samples = [
args.train_iters * args.global_batch_size,
args.eval_iters * args.global_batch_size,
0
]
train_ds, valid_ds, test_ds = decoder_packed_mtf_dataset.build_train_valid_test_datasets(
data_prefix=args.data_path,
data_impl=args.data_impl,
splits_string=args.split,
# TODO @thomasw21 figure how that value works
train_valid_test_num_samples=train_val_test_num_samples,
seq_length=args.seq_length + 1,
pad_token=tokenizer.pad,
eos_token=tokenizer.eos,
seed=args.seed,
skip_warmup=(not args.mmap_warmup)
)
batch_iterator = build_pretraining_data_loader(
train_ds, consumed_samples=0, num_workers=4
)
last_padding_size = 0
for i, items in enumerate(batch_iterator):
micro_batch_size, seq_length = items["decoder_token_ids"].shape
# Check dtypes
self.assertEqual(items["decoder_token_ids"].dtype, torch.int64)
self.assertEqual(items["decoder_segment_ids"].dtype, torch.int64)
self.assertEqual(items["decoder_is_inputs"].dtype, torch.bool)
# `micro_batch_size` correspond to the one in argument
self.assertEqual(micro_batch_size, args.micro_batch_size)
# `seq_length` correspond to the one in argument + 1 in order to get tokens/labels
self.assertEqual(seq_length, args.seq_length + 1)
original_samples_count = 0
for batch_id in range(micro_batch_size):
segment_ids = [k for k, _ in itertools.groupby(items["decoder_segment_ids"][batch_id])]
# `segment_ids` is [1,2,...]
self.assertEqual(segment_ids[:-1], list(range(1, len(segment_ids))))
# `0` signify that the tokens are padding
self.assertIn(segment_ids[-1], [0, len(segment_ids)])
original_samples_count += len([segment_id for segment_id in segment_ids if segment_id != 0])
# Test that we actually pack, ie we have more samples than the `batch_size`
self.assertGreater(original_samples_count, micro_batch_size)
# Test that the first sample of each batch couldn't fit inside the previous batch
first_sample_segment_ids = next(itertools.groupby(items["decoder_segment_ids"][0]))[1]
first_sample_size = len(list(first_sample_segment_ids))
self.assertGreater(first_sample_size, last_padding_size)
# update `last_padding_size`
last_padding_size = len([None for segment_id in items["decoder_segment_ids"][micro_batch_size - 1] if segment_id == 0])
def test_finetune_t0_non_causal_decoder_get_batch_pipe(self):
command_args = get_default_args()
command_args["--position-embedding-type"] = "alibi"
with patch('sys.argv', flatten_arguments(command_args)):
with mockenv_context(**self.dist_env_1_gpu):
deepspeed.init_distributed()
initialize_megatron()
args = get_args()
tokenizer = get_tokenizer()
# Hack: `gpt2` doesn't have a padding token, so we override that value.
tokenizer.tokenizer.pad_token_id = tokenizer.tokenizer.eos_token_id
# Dummy data
data = get_dummy_mtf_decoder_packed_data(
micro_batch_size=args.micro_batch_size,
seq_length=args.seq_length,
vocab_size=args.padded_vocab_size,
special_tokens_ids={tokenizer.pad}
)
(tokens, position_ids, attention_mask), (labels, loss_mask) = finetune_t0_non_causal_decoder.get_batch_pipe(data)
tokens = tokens.cpu()
position_ids = position_ids.cpu()
attention_mask = attention_mask.cpu()
labels = labels.cpu()
loss_mask = loss_mask.cpu()
self.assertEqual(loss_mask.dtype, torch.float)
torch_assert_equal(loss_mask.bool(), ~data["decoder_is_inputs"][:, 1:] * (data["decoder_token_ids"][:, :-1] != tokenizer.pad))
torch_assert_equal(tokens, data["decoder_token_ids"][:, :-1])
torch_assert_equal(labels, data["decoder_token_ids"][:, 1:])
for batch_id in range(args.micro_batch_size):
segment_cuts = torch.nonzero(data["decoder_segment_ids"][batch_id, 1:] - data["decoder_segment_ids"][batch_id, :-1]) + 1
for segment_start, segment_end in zip([0, *segment_cuts], [*segment_cuts, args.seq_length]):
self.assertTrue(torch.all(attention_mask[batch_id, 0, segment_start: segment_end, :segment_start]))
self.assertTrue(torch.all(attention_mask[batch_id, 0, segment_start: segment_end, segment_end:]))
# TODO @thomasw21 make sure that we reset `position_ids`
from random import randint
from typing import Set
from unittest.mock import patch
import deepspeed
import torch
from parameterized import parameterized
from torch import nn
import torch.nn.functional as F
from megatron.enums import AttnMaskType
from megatron.model.fused_layer_norm import MixedFusedLayerNorm
from packaging import version
from megatron import initialize_megatron, get_args, get_tokenizer, global_vars
from megatron.model.fused_softmax import ScaledMaskedSoftmax, FusedScaleMaskSoftmax
from megatron.model.utils import attention_mask_func
from megatron.testing_utils import TestCasePlus, mockenv_context, flatten_arguments, torch_assert_equal, \
torch_assert_close, require_torch_bf16
from megatron.training import setup_model_and_optimizer
import pretrain_gpt
import pretrain_prefix_lm
import finetune_t0_non_causal_decoder
def get_default_args(test_file_dir: str):
"""return a dictionary with key as argument name and value as additional arguments"""
return {
# GPT_ARGS
"--num-layers": "2",
"--hidden-size": "128",
"--num-attention-heads": "4",
"--seq-length": "256",
"--max-position-embeddings": "256",
"--micro-batch-size": "2",
"--global-batch-size": "2",
"--lr-decay-iters": "320000",
"--lr-decay-style": "cosine",
"--lr": "0.00015",
"--min-lr": "1.0e-5",
"--train-iters": "5000",
"--tokenizer-type": "PretrainedFromHF",
"--tokenizer-name-or-path": "gpt2",
"--data-impl": "mmap",
"--split": "949,50,1",
"--distributed-backend": "nccl",
"--weight-decay": "1e-2",
"--clip-grad": "1.0",
"--lr-warmup-fraction": ".01",
"--fp16": "",
"--inference": "",
"--attention-dropout": "0",
"--hidden-dropout": "0",
# OUTPUT_ARGS
"--log-interval": "10",
"--save-interval": "500",
"--eval-interval": "100",
"--eval-iters": "10",
"--checkpoint-activations": "",
# DATA_ARGS
# DeepSpeed args
"--deepspeed": "",
"--deepspeed_config": f"{test_file_dir}/ds_config_inference.json",
"--zero-stage": "0",
}
def equal_vectors(tensor1, tensor2, dim=-1):
"""View tensor1 and tensor2 as a list of vectors, and compute equality"""
return torch.linalg.norm(tensor1 - tensor2, dim=dim) == 0
def iter_out_of_one(one):
return iter([one])
def get_dummy_mtf_decoder_packed_data(micro_batch_size: int, seq_length: int, vocab_size: int, special_tokens_ids: Set[int]):
"""Code from `tests/test_dataloaders.py"""
seq_length += 1
num_segments = torch.randint(1, 5, ())
segment_ids = torch.zeros(micro_batch_size, seq_length, dtype=torch.long)
is_inputs = torch.zeros(micro_batch_size, seq_length, dtype=torch.bool)
for batch_id in range(micro_batch_size):
# - `*2`: Hack in order to two start_new_segements to be seperated with two tokens at least
# - `+1`: Hack in order the start_mew_segments not to be 0
start_new_segments = torch.sort(torch.randperm((seq_length - 2) // 2, )[:num_segments]).values * 2 + 1
segment_ids[batch_id, start_new_segments] = 1
end_inputs = [
torch.randint(low=start_segment, high=end_segment - 1, size=())
for start_segment, end_segment in zip([0, *start_new_segments], [*start_new_segments, seq_length])
]
for end_input, start_segment in zip(end_inputs, [0, *start_new_segments]):
is_inputs[batch_id][start_segment: end_input + 1] = True
segment_ids = torch.cumsum(segment_ids, dim=-1) + 1
tokens = torch.randint(high=vocab_size, size=(micro_batch_size, seq_length), dtype=torch.long)
flatten_token_view = tokens.view(-1,)
for token_id in range(len(flatten_token_view)):
token = flatten_token_view[token_id]
# While token is a special tokens we change that token
while token in special_tokens_ids:
flatten_token_view[token_id] = (token + 1) % vocab_size
token = flatten_token_view[token_id]
return {
"decoder_token_ids": tokens,
"decoder_segment_ids": segment_ids,
"decoder_is_inputs": is_inputs
}
class MyTestCase(TestCasePlus):
def setUp(self) -> None:
super().setUp()
# We reset all global variables
global_vars._GLOBAL_ARGS = None
global_vars._GLOBAL_NUM_MICROBATCHES_CALCULATOR = None
global_vars._GLOBAL_TOKENIZER = None
global_vars._GLOBAL_TENSORBOARD_WRITER = None
global_vars._GLOBAL_ADLR_AUTORESUME = None
global_vars._GLOBAL_TIMERS = None
self.dist_env_1_gpu = dict(
MASTER_ADDR="localhost", MASTER_PORT="9994", RANK="0", LOCAL_RANK="0", WORLD_SIZE="1"
)
def test_gpt(self):
"""Test causal invariance, ie past token don't depend on future tokens."""
command_args = get_default_args(self.test_file_dir_str)
with patch('sys.argv', flatten_arguments(command_args)):
with mockenv_context(**self.dist_env_1_gpu):
deepspeed.init_distributed()
initialize_megatron()
args = get_args()
tokenizer = get_tokenizer()
model, _, _ = setup_model_and_optimizer(pretrain_gpt.model_provider)
model = model[0]
model._config.train_micro_batch_size_per_gpu = args.micro_batch_size
model.set_train_batch_size(args.micro_batch_size)
token_ids = torch.randint(args.padded_vocab_size, (args.micro_batch_size, args.seq_length))
# eod is a special token
token_ids[token_ids == tokenizer.eod] += 1
token_ids[token_ids == tokenizer.eod] %= args.padded_vocab_size
# get a modified version of the first batch, we change a specific index
changed_index = randint(0, args.seq_length - 2)
token_ids_changed = token_ids.clone()
# We increment the token_id by one for that index in order to artificially change the sequence.
token_ids_changed[:, changed_index] = \
(token_ids_changed[:, changed_index] + 1) % args.padded_vocab_size
output = model.eval_batch(iter_out_of_one({"text": token_ids}), compute_loss=False)
output_changed = model.eval_batch(iter_out_of_one({"text": token_ids_changed}), compute_loss=False)
# All token in past should be unchanged
torch_assert_equal(output[:, :changed_index], output_changed[:, :changed_index])
# All tokens in the future should have changed
self.assertFalse(
torch.any(equal_vectors(output[:, changed_index:], output_changed[:, changed_index:]))
)
def test_prefix_lm_reset_attention_mask(self):
"""
Test prefix invariances when `reset_attention_mask=True`:
- Past target tokens don't depend on future target tokens.
- Target tokens depend on input tokens.
- Input tokens depend on all other input tokens, but never target tokens.
"""
command_args = get_default_args(self.test_file_dir_str)
command_args["--reset-attention-mask"] = ""
command_args["--loss-on-targets-only"] = ""
with patch('sys.argv', flatten_arguments(command_args)):
with mockenv_context(**self.dist_env_1_gpu):
deepspeed.init_distributed()
initialize_megatron()
args = get_args()
tokenizer = get_tokenizer()
model, _, _ = setup_model_and_optimizer(pretrain_prefix_lm.model_provider)
model = model[0]
model._config.train_micro_batch_size_per_gpu = args.micro_batch_size
model.set_train_batch_size(args.micro_batch_size)
# we preprocess batch_fn manually
model.set_batch_fn(None)
token_ids = torch.randint(args.padded_vocab_size, (args.micro_batch_size, args.seq_length))
# eod is a special token, this also guarantees that the whole row is considered as a document.
token_ids[token_ids == tokenizer.eod] += 1
token_ids[token_ids == tokenizer.eod] %= args.padded_vocab_size
# process batch to have non empty prefix
input_batch, (labels, loss_mask), prefix_indices = pretrain_prefix_lm.get_batch_pipe({"text": token_ids})
for batch_id in range(len(prefix_indices)):
for id in prefix_indices[batch_id]:
self.assertTrue(loss_mask[batch_id, id] == 1)
self.assertTrue(id > 0)
# Make sure that the last prefix token predicts the first token.
self.assertTrue(loss_mask[batch_id, id -1] == 1)
output = model.eval_batch(iter_out_of_one((input_batch, (labels, loss_mask), prefix_indices)), compute_loss=False)
## --------------- CHANGE A TARGET TOKEN ---------------------------
# get a modified version of the first batch
# guaranteed to exist as each row has at least one partial document
changed_target_index = prefix_indices[0][0]
token_ids_changed_target = input_batch[0].clone()
# We increment the token id on the changed index.
token_ids_changed_target[0, changed_target_index] = \
(token_ids_changed_target[0, changed_target_index] + 1) % args.padded_vocab_size
# make sure we're not changing a token to eod as it's a special token
token_ids_changed_target[token_ids_changed_target == tokenizer.eod] += 1
token_ids_changed_target[token_ids_changed_target == tokenizer.eod] %= args.padded_vocab_size
# Test change
output_changed_target = model.eval_batch(iter_out_of_one(((token_ids_changed_target, *input_batch[1:]), (labels, loss_mask), prefix_indices)), compute_loss=False)
# All token in past should be unchanged
torch_assert_equal(output[0, :changed_target_index], output_changed_target[0, :changed_target_index])
# All tokens in the future should have changed
self.assertFalse(
torch.any(
equal_vectors(output[0, changed_target_index:], output_changed_target[0, changed_target_index:])
)
)
# Unchanged changed rows should not change either
torch_assert_equal(output[1, :], output_changed_target[1, :])
## --------------- CHANGE AN INPUT TOKEN ---------------------------
# Let's change the the last prefix token and make sure that the first token changed
# guaranteed to be positive as we avoid pathological case previously
last_prefix_index = prefix_indices[0][0] - 1
token_ids_changed_input = input_batch[0].clone()
# We increment the token id on the changed index.
token_ids_changed_input[0, last_prefix_index] = \
(token_ids_changed_input[0, last_prefix_index] + 1) % args.padded_vocab_size
# make sure we're not changing a token to eod as it's a special token
token_ids_changed_input[token_ids_changed_input == tokenizer.eod] += 1
token_ids_changed_input[token_ids_changed_input == tokenizer.eod] %= args.padded_vocab_size
output_changed_input = model.eval_batch(iter_out_of_one(((token_ids_changed_input, *input_batch[1:]), (labels, loss_mask), prefix_indices)), compute_loss=False)
# All tokens should be changed
self.assertFalse(
torch.any(
equal_vectors(output[0, :], output_changed_input[0, :])
)
)
# Unchanged changed rows should not change either
torch_assert_equal(output[1, :], output_changed_input[1, :])
def test_prefix_lm_wo_reset_attention_mask(self):
"""
Test prefix invariances when `reset_attention_mask=False`:
- Past target tokens don't depend on future target tokens.
- Target tokens depend on input tokens.
- Input tokens depend on all other input tokens, but never target tokens.
"""
command_args = get_default_args(self.test_file_dir_str)
command_args["--loss-on-targets-only"] = ""
with patch('sys.argv', flatten_arguments(command_args)):
with mockenv_context(**self.dist_env_1_gpu):
deepspeed.init_distributed()
initialize_megatron()
args = get_args()
model, _, _ = setup_model_and_optimizer(pretrain_prefix_lm.model_provider)
model = model[0]
model._config.train_micro_batch_size_per_gpu = args.micro_batch_size
model.set_train_batch_size(args.micro_batch_size)
# we preprocess batch_fn manually
model.set_batch_fn(None)
token_ids = torch.randint(args.padded_vocab_size, (args.micro_batch_size, args.seq_length))
input_batch, (labels, loss_mask), prefix_indices = pretrain_prefix_lm.get_batch_pipe({"text": token_ids})
for batch_id in range(len(prefix_indices)):
id = prefix_indices[batch_id]
self.assertTrue(loss_mask[batch_id, id] == 1)
self.assertTrue(id > 0)
# Make sure that the last prefix token predicts the first token.
self.assertTrue(loss_mask[batch_id, id -1] == 1)
model.eval_batch(iter_out_of_one((input_batch, (labels, loss_mask), prefix_indices)), compute_loss=False)
#TODO: Check all invariants
def test_gpt_rotary_embeddings(self):
"""Test rotary embeddings"""
command_args = get_default_args(self.test_file_dir_str)
del command_args["--max-position-embeddings"]
command_args["--position-embedding-type"] = "rotary"
with patch('sys.argv', flatten_arguments(command_args)):
with mockenv_context(**self.dist_env_1_gpu):
deepspeed.init_distributed()
initialize_megatron()
args = get_args()
tokenizer = get_tokenizer()
model, _, _ = setup_model_and_optimizer(pretrain_gpt.model_provider)
model = model[0]
model._config.train_micro_batch_size_per_gpu = args.micro_batch_size
model.set_train_batch_size(args.micro_batch_size)
token_ids = torch.randint(args.padded_vocab_size, (args.micro_batch_size, args.seq_length))
# eod is a special token
token_ids[token_ids == tokenizer.eod] += 1
token_ids[token_ids == tokenizer.eod] %= args.padded_vocab_size
model.eval_batch(iter_out_of_one({"text": token_ids}), compute_loss=False)
#TODO: Check all invariants
@require_torch_bf16
def test_fused_layer_norm(self):
command_args = get_default_args(self.test_file_dir_str)
# Condition to use custom cuda kernel
command_args["--bf16"] = ""
del command_args["--fp16"]
with patch('sys.argv', flatten_arguments(command_args)):
with mockenv_context(**self.dist_env_1_gpu):
initialize_megatron()
args = get_args()
dummy_input = torch.randn(args.micro_batch_size, args.seq_length, args.hidden_size, device="cuda", dtype=torch.bfloat16)
normalized_shape = (args.hidden_size,)
epsilon = 1e-5
mfln = MixedFusedLayerNorm(normalized_shape, eps=epsilon)
self.assertTrue(mfln.use_meg_ds_fused_layer_norm, "Expected model to use Megatron-DeepSpeed custom cuda kernel for LayerNorm.")
self.assertTrue(args.bf16, "Test has to be done in half precision.")
# We set the weight manually so we simulate state that's not the initialisation
weight = torch.randn(args.hidden_size, device="cuda", dtype=torch.bfloat16)
bias = torch.randn(args.hidden_size, device="cuda", dtype=torch.bfloat16)
mfln.weight = nn.Parameter(weight)
mfln.bias = nn.Parameter(bias)
mfln_output = mfln(dummy_input)
# We check that our layernorm matches pytorch 1.11 onwards
if version.parse(torch.__version__) >= version.parse("1.11.0"):
torch_layer_norm_output = F.layer_norm(dummy_input, normalized_shape, weight, bias, eps=epsilon)
else:
# In this case we use can check that basically it corresponds to the fp32 version
torch_layer_norm_output = F.layer_norm(dummy_input.float(), normalized_shape, weight.float(), bias.float(), eps=epsilon).to(torch.bfloat16)
torch_assert_equal(mfln_output, torch_layer_norm_output)
@parameterized.expand([(attn_mask_type,) for attn_mask_type in AttnMaskType])
def test_fused_masked_softmax(self, attn_mask_type: AttnMaskType):
command_args = get_default_args(self.test_file_dir_str)
with patch('sys.argv', flatten_arguments(command_args)):
with mockenv_context(**self.dist_env_1_gpu):
initialize_megatron()
args = get_args()
dummy_input = torch.randn(
args.micro_batch_size,
args.num_attention_heads,
args.seq_length,
args.seq_length,
device="cuda",
dtype=args.params_dtype
)
if attn_mask_type == AttnMaskType.causal:
dummy_attention_mask = None
else:
dummy_attention_mask = torch.randn(
args.micro_batch_size,
1, # `args.num_attention_heads` not implemented in our cuda kernel
args.seq_length,
args.seq_length,
device="cuda",
dtype=args.params_dtype
) < 0
scale = torch.rand(())
fused_scaled_softmax = FusedScaleMaskSoftmax(
input_in_fp16=args.params_dtype == torch.float16,
input_in_bf16=args.params_dtype == torch.bfloat16,
attn_mask_type=attn_mask_type,
scaled_masked_softmax_fusion=True,
mask_func=attention_mask_func,
softmax_in_fp32=True,
scale=scale,
)
unfused_scaled_softmax = FusedScaleMaskSoftmax(
input_in_fp16=args.params_dtype == torch.float16,
input_in_bf16=args.params_dtype == torch.bfloat16,
attn_mask_type=attn_mask_type,
scaled_masked_softmax_fusion=False,
mask_func=attention_mask_func,
softmax_in_fp32=True,
scale=scale,
)
self.assertTrue(fused_scaled_softmax.is_kernel_available(dummy_attention_mask, *dummy_input.size()))
fused_output = fused_scaled_softmax(dummy_input, dummy_attention_mask)
self.assertFalse(unfused_scaled_softmax.is_kernel_available(dummy_attention_mask, *dummy_input.size()))
unfused_output = unfused_scaled_softmax(dummy_input, dummy_attention_mask)
# Test that the nonzeros are the same with the mask
for i in range(args.num_attention_heads):
if dummy_attention_mask is None:
# Make sure it's causal, values in the lower triangle should be not zero.
non_zero_values = torch.tril(torch.ones_like(fused_output[:, i]))
torch_assert_equal(torch.nonzero(fused_output[:, i]), torch.nonzero(non_zero_values))
else:
torch_assert_equal(torch.nonzero(fused_output[:, i]), torch.nonzero(~dummy_attention_mask[:, 0]))
# Cuda kernel produces slightly different results
torch_assert_close(fused_output, unfused_output)
def test_non_causal_decoder_model_with_packed_input_passed_with_attention_mask_is_not_causal_across_segments(self):
command_args = get_default_args(self.test_file_dir_str)
command_args["--position-embedding-type"] = "alibi"
with patch('sys.argv', flatten_arguments(command_args)):
with mockenv_context(**self.dist_env_1_gpu):
deepspeed.init_distributed()
initialize_megatron()
args = get_args()
tokenizer = get_tokenizer()
# Hack: `gpt2` doesn't have a padding token, so we override that value.
tokenizer.tokenizer.pad_token_id = tokenizer.tokenizer.eos_token_id
data = get_dummy_mtf_decoder_packed_data(
micro_batch_size=args.micro_batch_size,
seq_length=args.seq_length,
vocab_size=args.padded_vocab_size,
special_tokens_ids={tokenizer.pad}
)
model, _, _ = setup_model_and_optimizer(finetune_t0_non_causal_decoder.model_provider)
model = model[0]
model._config.train_micro_batch_size_per_gpu = args.micro_batch_size
model.set_train_batch_size(args.micro_batch_size)
output = model.eval_batch(iter_out_of_one(data), compute_loss=False)
## --------------- CHANGE A TARGET TOKEN ---------------------------
# change the first token in the first batch to a random value
change_batch_id = 0
change_token_id = 0
token_ids_changed = data["decoder_token_ids"].clone()
# We increment the token id on the changed index.
token_ids_changed[change_batch_id, change_token_id] = (token_ids_changed[change_batch_id, change_token_id] + 1) % args.padded_vocab_size
while token_ids_changed[change_batch_id, change_token_id] in {tokenizer.eod, tokenizer.pad}:
token_ids_changed[change_batch_id, change_token_id] = (token_ids_changed[change_batch_id, change_token_id] + 1) % args.padded_vocab_size
# Test change
output_changed_target = model.eval_batch(iter_out_of_one({**data, "decoder_token_ids": token_ids_changed}), compute_loss=False)
first_segment_first_batch_id_end = (torch.nonzero(data["decoder_segment_ids"][change_batch_id, 1:] - data["decoder_segment_ids"][change_batch_id, :-1]) + 1)[0]
# Check that values changed in segment 1 of batch_id 0
self.assertFalse(torch.any(
equal_vectors(
output[change_batch_id, change_token_id:first_segment_first_batch_id_end],
output_changed_target[change_batch_id, change_token_id:first_segment_first_batch_id_end]
)
))
# Check that values did not change in other segments of batch_id 0
torch_assert_equal(
output[change_batch_id, first_segment_first_batch_id_end:],
output_changed_target[change_batch_id, first_segment_first_batch_id_end:]
)
# Check that values did not change in other segments in other batches
non_change_ids = torch.arange(output.shape[0]) != change_batch_id
torch_assert_equal(output[non_change_ids], output_changed_target[non_change_ids])
## --------------- CHANGE A TARGET TOKEN ---------------------------
# change the last token in the first batch to a pad
token_ids_changed_pad = data["decoder_token_ids"].clone()
segment_ids_changed_pad = data["decoder_segment_ids"].clone()
# We increment the token id on the changed index.
token_ids_changed_pad[change_batch_id, -1] = tokenizer.pad
segment_ids_changed_pad[change_batch_id, -1] = 0
# Test model handles padding correctly
output_changed_pad = model.eval_batch(iter_out_of_one({**data, "decoder_token_ids": token_ids_changed_pad, "decoder_segment_ids": segment_ids_changed_pad}), compute_loss=False)
self.assertFalse(torch.any(torch.isnan(output_changed_pad)))
# Copyright 2020 The HuggingFace Team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import filecmp
import io
import json
import re
import os
import unittest
import functools
from pathlib import Path
from megatron.testing_utils import (
TestCasePlus,
execute_subprocess_async,
set_seed
)
from datasets import load_dataset
set_seed(42)
def write_jsonl(path, lines_num=1000, line_length=1024):
def get_text_line(line_length):
# XXX: fix to generate line_length
return "It's a wonderful world. I'm just walking on air. Talk of heaven on earth. I've got more than my share. Haven't got a care. Happy all day through. It's a wonderful world. Loving wonderful you!"
with io.open(path, "w", encoding="utf-8") as f:
for i in range(lines_num):
rec = dict(text=get_text_line(line_length))
x = json.dumps(rec, indent=0, ensure_ascii=False)
x = re.sub(r'\n', ' ', x, 0, re.M)
f.write(x + "\n")
@functools.lru_cache()
def download_hf_dataset(dsetname):
return load_dataset(dsetname)
class MegDSTestPreprocessing(TestCasePlus):
""" """
def setUp(self):
super().setUp()
def test_preprocess_data(self):
src_dir = self.src_dir
data_dir = f"{self.data_dir}/gpt2"
output_dir = self.get_auto_remove_tmp_dir() # "./xxx", after=False)
# autogenerate "input.jsonl"
input_path = f"{output_dir}/input.jsonl"
write_jsonl(input_path)
output_prefix =f"{output_dir}/test-ds"
cmd = f"""
python {src_dir}/tools/preprocess_data.py
--input {input_path}
--output-prefix {output_prefix}
--dataset-impl mmap
--tokenizer-type GPT2BPETokenizer
--merge-file {data_dir}/gpt2-tiny-merges.txt
--vocab {data_dir}/gpt2-tiny-vocab.json
--append-eod
--workers 2
""".split()
# keep for quick debug
# print(" ".join([f"\nPYTHONPATH={self.src_dir_str}"] +cmd)); die
execute_subprocess_async(cmd, env=self.get_env())
for ext in ["bin", "idx"]:
tgt_path = f"{output_prefix}_text_document.{ext}"
self.assertTrue(Path(tgt_path).exists(), )
def compare_meg_data_files(self, tgt, ref):
for ext in ["bin", "idx"]:
tgt_path = f"{tgt}.{ext}"
ref_path = f"{ref}.{ext}"
self.assertTrue(Path(tgt_path).exists(), )
self.assertTrue(filecmp.cmp(tgt_path, ref_path, shallow=False))
def preprocess_partitioned_dataset(self, output_dir, dsetname, splitname, linelimit, numparts):
"""Preprocess a dataset as a whole and in shards to prepare environment for merge test.
Load specified HF dataset using given split and record limit.
Write the dataset to a jsonl file and preprocess.
Also split dataset into numparts contiguous shards, write each shard to its own jsonl, and preprocess each.
Return path to the full dataset and a list of paths for each shard."""
src_dir = self.src_dir
data_dir = f"{self.data_dir}/gpt2"
# preproces_data_dist requires one to have already downloaded the input HF dataset.
# We do that by running this script before the test.
dset = download_hf_dataset(dsetname)[splitname]
# limit the test to use the first linelimit entries to be faster
dset = dset.select(range(linelimit))
# write jsonl file of full dataset
json_ds = f"{output_dir}/ds-full.jsonl"
dset.to_json(json_ds)
# process full jsonl into indexed dataset file
ds_full = f"{output_dir}/ds-full"
cmd = f"""
python {src_dir}/tools/preprocess_data.py
--input {json_ds}
--output-prefix {ds_full}
--dataset-impl mmap
--tokenizer-type GPT2BPETokenizer
--merge-file {data_dir}/gpt2-tiny-merges.txt
--vocab {data_dir}/gpt2-tiny-vocab.json
--append-eod
""".split()
ds_full += '_text_document'
# keep for quick debug
# print(" ".join([f"\nPYTHONPATH={self.src_dir_str}"] +cmd)); die
execute_subprocess_async(cmd, env=self.get_env())
# write each part to its own json file
ds_parts = []
for i in range(numparts):
json_part = f"{output_dir}/ds-part-{i}.jsonl"
dset.shard(numparts, i, contiguous=True).to_json(json_part)
ds_part = f"{output_dir}/ds-part-{i}"
ds_parts.append(ds_part + '_text_document')
cmd = f"""
python {src_dir}/tools/preprocess_data.py
--input {json_part}
--output-prefix {ds_part}
--dataset-impl mmap
--tokenizer-type GPT2BPETokenizer
--merge-file {data_dir}/gpt2-tiny-merges.txt
--vocab {data_dir}/gpt2-tiny-vocab.json
--append-eod
""".split()
# keep for quick debug
# print(" ".join([f"\nPYTHONPATH={self.src_dir_str}"] +cmd)); die
execute_subprocess_async(cmd, env=self.get_env())
return ds_full, ds_parts
def test_merge_serial(self):
"""Check that serial merge of partial dataset files produces the same file as the full dataset."""
src_dir = self.src_dir
output_dir = self.get_auto_remove_tmp_dir() # "./xxx", after=False)
# process full dataset, and process the full dataset as 3 contiguous chunks
ds_full, ds_parts = self.preprocess_partitioned_dataset(output_dir, 'stas/openwebtext-10k', 'train', 100, 3)
# merge the part files into a single indexed dataset
ds_merged = f"{output_dir}/ds-merged"
cmd = f"""
python {src_dir}/tools/merge_preprocessed_data.py
--datasets {" ".join(ds_parts)}
--output-prefix {ds_merged}
""".split()
# keep for quick debug
# print(" ".join([f"\nPYTHONPATH={self.src_dir_str}"] +cmd)); die
execute_subprocess_async(cmd, env=self.get_env())
# the full dataset and the merged dataset should be identical
self.compare_meg_data_files(ds_full, ds_merged)
def test_merge_distributed(self):
"""Check that serial merge of partial dataset files produces the same file as the full dataset."""
src_dir = self.src_dir
output_dir = self.get_auto_remove_tmp_dir() # "./xxx", after=False)
# process full dataset, and process the full dataset as 3 contiguous chunks
ds_full, ds_parts = self.preprocess_partitioned_dataset(output_dir, 'stas/openwebtext-10k', 'train', 100, 3)
# merge the part files into a single indexed dataset
ds_merged = f"{output_dir}/ds-merged"
cmd = f"""
python -m torch.distributed.launch --nproc_per_node 6 {src_dir}/tools/merge_preprocessed_data.py
--merge distributed
--datasets {" ".join(ds_parts)}
--output-prefix {ds_merged}
--torch-backend gloo
""".split()
# keep for quick debug
# print(" ".join([f"\nPYTHONPATH={self.src_dir_str}"] +cmd)); die
execute_subprocess_async(cmd, env=self.get_env())
# the full dataset and the merged dataset should be identical
self.compare_meg_data_files(ds_full, ds_merged)
def test_process_data_microsoft(self):
"""We want to be stable to Microsoft version."""
src_dir = self.src_dir
data_dir = f"{self.data_dir}/gpt2"
output_dir = self.get_auto_remove_tmp_dir() # "./xxx", after=False)
input_path = f"{self.tests_dir}/data/gpt2/openwebtext-1000.jsonl"
output_prefix = f"{output_dir}/test-ds-meg-gpt2-openwebtext"
cmd = f"""
python {src_dir}/tools/preprocess_data.py
--input {input_path}
--output-prefix {output_prefix}
--dataset-impl mmap
--tokenizer-type GPT2BPETokenizer
--merge-file {data_dir}/gpt2-tiny-merges.txt
--vocab {data_dir}/gpt2-tiny-vocab.json
--append-eod
--workers 2
""".split()
# keep for quick debug
# print(" ".join([f"\nPYTHONPATH={self.src_dir_str}"] +cmd)); die
execute_subprocess_async(cmd, env=self.get_env())
self.compare_meg_data_files(f"{output_prefix}_text_document", f"{data_dir}/meg-gpt2-openwebtext_text_document")
def test_process_data_dist_microsoft(self):
"""We want to be stable to Microsoft version."""
src_dir = self.src_dir
data_dir = f"{self.data_dir}/gpt2"
output_dir = self.get_auto_remove_tmp_dir() # "./xxx", after=False)
output_prefix = f"{output_dir}/test-ds-meg-gpt2-openwebtext_1k"
# preprocess_data_dist requires one to have already downloaded the input HF dataset.
# We do that by running this script before the test.
dsetname = 'stas/openwebtext-10k'
download_hf_dataset(dsetname)
cmd = f"""
python -m torch.distributed.launch --nproc_per_node 2 {src_dir}/tools/preprocess_data_dist.py
--input {dsetname}
--count 1000
--output-prefix {output_prefix}
--dataset-impl mmap
--tokenizer-type GPT2BPETokenizer
--merge-file {data_dir}/gpt2-tiny-merges.txt
--vocab {data_dir}/gpt2-tiny-vocab.json
--append-eod
""".split()
# keep for quick debug
# print(" ".join([f"\nPYTHONPATH={self.src_dir_str}"] +cmd)); die
execute_subprocess_async(cmd, env=self.get_env())
self.compare_meg_data_files(f"{output_prefix}_text_document", f"{data_dir}/meg-gpt2-openwebtext_text_document")
def test_process_data_dist_serial_microsoft(self):
"""We want to be stable to Microsoft version."""
src_dir = self.src_dir
data_dir = f"{self.data_dir}/gpt2"
output_dir = self.get_auto_remove_tmp_dir() # "./xxx", after=False)
output_prefix = f"{output_dir}/test-ds-meg-gpt2-openwebtext_1k"
# preproces_data_dist requires one to have already downloaded the input HF dataset.
# We do that by running this script before the test.
dsetname = 'stas/openwebtext-10k'
download_hf_dataset(dsetname)
cmd = f"""
python -m torch.distributed.launch --nproc_per_node 2 {src_dir}/tools/preprocess_data_dist.py
--input {dsetname}
--count 1000
--merge serial
--output-prefix {output_prefix}
--dataset-impl mmap
--tokenizer-type GPT2BPETokenizer
--merge-file {data_dir}/gpt2-tiny-merges.txt
--vocab {data_dir}/gpt2-tiny-vocab.json
--append-eod
""".split()
# keep for quick debug
# print(" ".join([f"\nPYTHONPATH={self.src_dir_str}"] +cmd)); die
execute_subprocess_async(cmd, env=self.get_env())
self.compare_meg_data_files(f"{output_prefix}_text_document", f"{data_dir}/meg-gpt2-openwebtext_text_document")
import unittest
from random import randint
from unittest.mock import patch
import deepspeed
import torch
import logging
import numpy as np
import pytest
from megatron import initialize_megatron, get_args, get_tokenizer, global_vars
from megatron.testing_utils import TestCasePlus, mockenv_context, flatten_arguments, require_deepspeed, require_torch_multi_gpu
from megatron.training import setup_model_and_optimizer
from megatron.mpu.mappings import gather_from_tensor_model_parallel_region
from pretrain_gpt import model_provider as gpt_model_provider, get_batch_pipe as get_gpt_batch_pipe
from pretrain_prefix_lm import model_provider as prefix_lm_model_provider, get_batch_pipe as get_prefix_lm_batch_pipe
import multiprocessing as mp
from multiprocessing import Pool
from megatron.checkpointing import save_checkpoint
from megatron.utils import get_ltor_masks_and_position_ids
@require_deepspeed
@require_torch_multi_gpu
class MegDSTestTP(TestCasePlus):
def get_default_args(self):
"""return a dictionary with key as argument name and value as additional arguments"""
data_dir = f"{self.data_dir}/gpt2"
return {
# GPT_ARGS
"--num-layers": "2",
"--hidden-size": "128",
"--num-attention-heads": "4",
"--seq-length": "256",
"--max-position-embeddings": "256",
"--micro-batch-size": "4",
"--global-batch-size": "8",
"--lr-decay-iters": "320000",
"--lr-decay-style": "cosine",
"--lr": "0.00015",
"--min-lr": "1.0e-5",
"--train-iters": "5000",
"--tokenizer-type": "GPT2BPETokenizer",
"--merge-file": f"{data_dir}/gpt2-tiny-merges.txt",
"--vocab-file": f"{data_dir}/gpt2-tiny-vocab.json",
"--data-impl": "mmap",
"--split": "949,50,1",
"--distributed-backend": "nccl",
"--weight-decay": "1e-2",
"--clip-grad": "1.0",
"--lr-warmup-fraction": ".01",
"--fp16": "",
"--attention-dropout": "0",
"--hidden-dropout": "0",
# OUTPUT_ARGS
"--log-interval": "10",
"--save-interval": "500",
"--eval-interval": "100",
"--eval-iters": "10",
"--checkpoint-activations": "",
#ds args
"--deepspeed": "",
"--deepspeed_config":f"{self.test_file_dir_str}/ds_config.json",
"--zero-stage": "1",
"--deepspeed-activation-checkpointing": ""
# DATA_ARGS
}
def setUp(self) -> None:
super().setUp()
# We reset all global variables
global_vars._GLOBAL_ARGS = None
global_vars._GLOBAL_NUM_MICROBATCHES_CALCULATOR = None
global_vars._GLOBAL_TOKENIZER = None
global_vars._GLOBAL_TENSORBOARD_WRITER = None
global_vars._GLOBAL_ADLR_AUTORESUME = None
global_vars._GLOBAL_TIMERS = None
def infer_model(args):
tp_index, tp_size, command_args, token_ids, save, load = args
dist_env = dict(
MASTER_ADDR="localhost", MASTER_PORT="9991", RANK=str(tp_index), LOCAL_RANK=str(tp_index), WORLD_SIZE=str(tp_size)
)
logging.getLogger().critical("Process: starting")
#Hack
import megatron.initialize as init
init.git_ds_info = lambda: None
with patch('sys.argv', flatten_arguments(command_args)):
with mockenv_context(**dist_env):
def create_model_inputs(tokens):
args = get_args()
attention_mask, loss_mask, position_ids = get_ltor_masks_and_position_ids(
tokens,
tokenizer.eod,
args.reset_position_ids,
args.reset_attention_mask,
args.eod_mask_loss,
prefix_indices=None,
loss_on_targets_only=False)
return (tokens, position_ids, attention_mask), (tokens, loss_mask)
deepspeed.init_distributed()
initialize_megatron()
args = get_args()
tokenizer = get_tokenizer()
model, _, _ = setup_model_and_optimizer(gpt_model_provider)
model = model[0]
if load is not None:
# Hack (same as in eval_harness/evaluate.py)
# Loading pipelined models in deepspeed with different TP than it was originally trained on fails
# due to a sanity check, that makes sure that all state_dicts that we merge contains attention layers.
# This, however, is not true for pipelining when we will merge the state_dict for the embeddings which
# which does not contain these attention-specific keys.
#
# Deepspeed does however manage to load the model if we just turn off this sanity check.
deepspeed.runtime.state_dict_factory.MegatronSDLoader.sanity_check = lambda self, ckpt_file_name: None
zero_enabled = model._config.zero_enabled
model._config.zero_enabled = False
_, _ = model.load_checkpoint(load, load_optimizer_states=False, load_lr_scheduler_states=False, load_module_only=True)
model._config.zero_enabled = zero_enabled
if token_ids is None:
token_ids = torch.randint(args.padded_vocab_size, (args.micro_batch_size, args.seq_length))
# eod is a special token
token_ids[token_ids == tokenizer.eod] += 1
token_ids[token_ids == tokenizer.eod] %= args.padded_vocab_size
else:
token_ids = torch.tensor(token_ids)
model.micro_batches = 1
model.set_batch_fn(create_model_inputs)
# process batch
input_batch = get_gpt_batch_pipe({"text": token_ids})[0]
# get a modified version of the first batch, we change a specific index
changed_index = randint(0, args.seq_length - 2)
input_token_ids_changed = input_batch[0].clone()
# We increment the token_id by one for that index in order to artificially change the sequence.
input_token_ids_changed[:, changed_index] = \
(input_token_ids_changed[:,changed_index] + 1) % args.padded_vocab_size
output = model.eval_batch(iter([token_ids]), compute_loss = False, reduce_output = None)[0]
output = gather_from_tensor_model_parallel_region(output)
if save != None:
args.save = save
save_checkpoint(0, [model], None, None)
return (output[0].detach().cpu().numpy(), token_ids.detach().cpu().numpy())
def test_alibi_tp(self):
mp.set_start_method('spawn', force=True)
cp_dir = self.get_auto_remove_tmp_dir()
command_args = self.get_default_args()
command_args["--pad-vocab-size-to"] = "5120" # This is equal to 128 * 40 which is above the len of gp2-tiny vocabulary
command_args["--position-embedding-type"] = "alibi"
command_args["--tensor-model-parallel-size"] = "1"
pool = Pool(1)
result = pool.map(MegDSTestTP.infer_model, [((0, 1, command_args, None, cp_dir, None))])
pool.close()
pool.join()
output, tokens = result[0]
logging.getLogger().info("First done!")
command_args["--tensor-model-parallel-size"] = "2"
pool = Pool(2)
result = pool.map(MegDSTestTP.infer_model, [((0, 2, command_args, tokens, None, cp_dir)), ((1, 2, command_args, tokens, None, cp_dir))])
pool.close()
pool.join()
output2, tokens = result[0]
logging.getLogger().critical(output-output2)
self.assertTrue(np.allclose(output,output2, atol=5e-3, rtol=0), "Different results when running with TP=1 and TP=2")
def test_embedding_matrix_tp(self):
mp.set_start_method('spawn', force=True)
cp_dir = self.get_auto_remove_tmp_dir()
command_args = self.get_default_args()
command_args["--pad-vocab-size-to"] = "5120" # This is equal to 128 * 40 which is above the len of gp2-tiny vocabulary
command_args["--seq-length"] = "4"
command_args["--micro-batch-size"] = "2"
tokens = [[5119, 0, 1, 5100],[0, 1, 5111, 5101]]
command_args["--tensor-model-parallel-size"] = "1"
pool = Pool(1)
# tp_index, tp_size, command_args, token_ids, save, load
result = pool.map(MegDSTestTP.infer_model, [((0, 1, command_args, tokens, cp_dir, None))])
pool.close()
pool.join()
output, _ = result[0]
logging.getLogger().info("First done!")
command_args["--tensor-model-parallel-size"] = "2"
pool = Pool(2)
result = pool.map(MegDSTestTP.infer_model, [((0, 2, command_args, tokens, None, cp_dir)), ((1, 2, command_args, tokens, None, cp_dir))])
pool.close()
pool.join()
output2, _ = result[0]
logging.getLogger().critical(output-output2)
self.assertTrue(np.allclose(output,output2, atol=5e-3, rtol=0), "Different results when running with TP=1 and TP=2")
def test_embedding_matrix_tp_with_invalid_tokens_ids(self):
mp.set_start_method('spawn', force=True)
command_args = self.get_default_args()
command_args["--pad-vocab-size-to"] = "5120" # This is equal to 128 * 40 which is above the len of gp2-tiny vocabulary
command_args["--seq-length"] = "4"
command_args["--micro-batch-size"] = "2"
tokens = [[5120, 0, 1, 2],[0, 1, 3, 4]]
command_args["--tensor-model-parallel-size"] = "1"
pool = Pool(1)
with pytest.raises(Exception) as exc_info:
_ = pool.map(MegDSTestTP.infer_model, [((0, 1, command_args, tokens, None, None))])
pool.close()
pool.join()
self.assertIn("There is an input id in the input that is greater than the highest possible input id" , str(exc_info.value))
logging.getLogger().info("First done!")
command_args["--tensor-model-parallel-size"] = "2"
pool = Pool(2)
with pytest.raises(Exception) as exc_info:
_ = pool.map(MegDSTestTP.infer_model, [((0, 2, command_args, tokens, None, None)), ((1, 2, command_args, tokens, None, None))])
pool.close()
pool.join()
self.assertIn("There is an input id in the input that is greater than the highest possible input id", str(exc_info.value))
def test_tokenizer_vocab_size_multiple_of_tp_size(self):
mp.set_start_method('spawn', force=True)
command_args = self.get_default_args()
command_args["--pad-vocab-size-to"] = "5121" # This is equal to 128 * 40 + 1 which is above the len of gp2-tiny vocabulary
command_args["--micro-batch-size"] = "4"
command_args["--tensor-model-parallel-size"] = "2"
command_args["--make-vocab-size-divisible-by"] = "1"
pool = Pool(2)
with pytest.raises(Exception) as exc_info:
_ = pool.map(MegDSTestTP.infer_model, [((0, 2, command_args, None, None, None)), ((1, 2, command_args, None, None, None))])
pool.close()
pool.join()
self.assertEqual(str(exc_info.value), "5121 is not divisible by 2")
def test_tokenizer_raise_error_make_vocab_size_divisible_by(self):
mp.set_start_method('spawn', force=True)
command_args = self.get_default_args()
command_args["--pad-vocab-size-to"] = "5121" # This is equal to 128 * 40 + 1 which is above the len of gp2-tiny vocabulary
command_args["--micro-batch-size"] = "4"
pool = Pool(2)
with pytest.raises(Exception) as exc_info:
_ = pool.map(MegDSTestTP.infer_model, [((0, 2, command_args, None, None, None)), ((1, 2, command_args, None, None, None))])
pool.close()
pool.join()
self.assertEqual(str(exc_info.value), "5121 is not divisible by 128")
if __name__ == '__main__':
unittest.main()
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment