Commit af4cf80e authored by liangjing's avatar liangjing
Browse files

add qwen

parent 77361dae
Pipeline #1872 passed with stage
# Copyright (c) 2022, NVIDIA CORPORATION. All rights reserved.
"""Megatron initialization."""
import logging
import random
import os
import time
import numpy as np
import torch
from datetime import timedelta
from megatron.legacy import fused_kernels
from megatron.training import get_adlr_autoresume
from megatron.training import get_args
from megatron.training import get_tensorboard_writer
from megatron.core import mpu, tensor_parallel
from megatron.training.arguments import parse_args, validate_args
from megatron.training.yaml_arguments import validate_yaml
from megatron.training.checkpointing import load_args_from_checkpoint
from megatron.training.global_vars import set_global_variables
from megatron.core.fusions.fused_bias_dropout import bias_dropout_add_fused_train
from megatron.core.fusions.fused_bias_gelu import bias_gelu
from megatron.core.fusions.fused_bias_swiglu import bias_swiglu
logger = logging.getLogger(__name__)
def initialize_megatron(
extra_args_provider=None,
args_defaults={},
ignore_unknown_args=False,
allow_no_cuda=False,
skip_mpu_initialization=False,
get_embedding_ranks=None,
get_position_embedding_ranks=None
):
"""Set global variables, initialize distributed, and
set autoresume and random seeds.
`allow_no_cuda` should not be set unless using megatron for cpu only
data processing. In general this arg should not be set unless you know
what you are doing.
Returns a function to finalize distributed env initialization
(optionally, only when args.lazy_mpu_init == True)
"""
if not allow_no_cuda:
# Make sure cuda is available.
assert torch.cuda.is_available(), "Megatron requires CUDA."
# Parse arguments
args = parse_args(extra_args_provider, ignore_unknown_args)
# Prep for checkpoint conversion.
if args.ckpt_convert_format is not None:
assert args.ckpt_convert_save is not None
assert args.load is not None
args.exit_on_missing_checkpoint = True
if args.use_checkpoint_args or args_defaults.get("use_checkpoint_args", False):
assert args.load is not None, "--use-checkpoint-args requires --load argument"
load_args_from_checkpoint(args)
if args.yaml_cfg is not None:
args = validate_yaml(args, args_defaults)
else:
validate_args(args, args_defaults)
# set global args, build tokenizer, and set adlr-autoresume,
# tensorboard-writer, and timers.
set_global_variables(args)
# set logging level
setup_logging()
# torch.distributed initialization
def finish_mpu_init():
args = get_args()
# Pytorch distributed.
_initialize_distributed(get_embedding_ranks, get_position_embedding_ranks)
# Random seeds for reproducibility.
if args.rank == 0:
print("> setting random seeds to {} ...".format(args.seed))
_set_random_seed(args.seed, args.data_parallel_random_init)
if skip_mpu_initialization:
return None
args = get_args()
if args.lazy_mpu_init:
# TODO is this still a necessary option?
args.use_cpu_initialization = True
# delayed initialization of DDP-related stuff
# We only set basic DDP globals
mpu.set_tensor_model_parallel_world_size(args.tensor_model_parallel_size)
# and return function for external DDP manager
# to call when it has DDP initialized
mpu.set_tensor_model_parallel_rank(args.rank)
return finish_mpu_init
else:
# Megatron's MPU is the master. Complete initialization right away.
finish_mpu_init()
# Autoresume.
_init_autoresume()
# Compile dependencies.
_compile_dependencies()
if args.tp_comm_overlap:
_initialize_tp_communicators()
# No continuation function
return None
def _compile_dependencies():
args = get_args()
# =========================
# Compile dataset C++ code.
# =========================
# TODO: move this to ninja
if torch.distributed.get_rank() == 0:
start_time = time.time()
print("> compiling dataset index builder ...")
from megatron.core.datasets.utils import compile_helpers
compile_helpers()
print(
">>> done with dataset index builder. Compilation time: {:.3f} "
"seconds".format(time.time() - start_time),
flush=True,
)
# ==================
# Load fused kernels
# ==================
# Custom kernel constraints check.
seq_len = args.seq_length
attn_batch_size = (
args.num_attention_heads / args.tensor_model_parallel_size
) * args.micro_batch_size
# Constraints on sequence length and attn_batch_size to enable warp based
# optimization and upper triangular optimization (for causal mask)
custom_kernel_constraint = (
seq_len > 16
and seq_len <= 16384
and seq_len % 4 == 0
and attn_batch_size % 4 == 0
)
# Print a warning.
if not (
(args.fp16 or args.bf16)
and custom_kernel_constraint
and args.masked_softmax_fusion
):
if args.rank == 0:
print(
"WARNING: constraints for invoking optimized"
" fused softmax kernel are not met. We default"
" back to unfused kernel invocations.",
flush=True,
)
# Always build on rank zero first.
if torch.distributed.get_rank() == 0:
start_time = time.time()
print("> compiling and loading fused kernels ...", flush=True)
#fused_kernels.load(args)
torch.distributed.barrier()
else:
torch.distributed.barrier()
#fused_kernels.load(args)
# Simple barrier to make sure all ranks have passed the
# compilation phase successfully before moving on to the
# rest of the program. We think this might ensure that
# the lock is released.
torch.distributed.barrier()
if torch.distributed.get_rank() == 0:
print(
">>> done with compiling and loading fused kernels. "
"Compilation time: {:.3f} seconds".format(time.time() - start_time),
flush=True,
)
def _initialize_tp_communicators():
""" initializing the communicators with user buffers for high-performance tensor-model-parallel
communication overlap """
try:
import yaml
import transformer_engine
from transformer_engine.pytorch import module as te_module
except ImportError:
raise RuntimeError("Tensor Parallel Communication/GEMM Overlap optimization needs 'yaml' and "
"'transformer_engine' packages")
args = get_args()
if args.tp_comm_overlap_cfg is not None:
with open(args.tp_comm_overlap_cfg,"r") as stream:
ub_cfgs = yaml.safe_load(stream)
else:
ub_cfgs = {}
input_shape = [(args.seq_length * args.micro_batch_size) // args.context_parallel_size , args.hidden_size]
#We create a MPI process group, which is needed to bootstrap the pipelined
#tensor-model-parallel communication overlap
torch.distributed.new_group(backend='mpi')
te_module.base.initialize_ub(shape = input_shape, tp_size = args.tensor_model_parallel_size,
use_fp8 = (args.fp8 is not None) , ub_cfgs = ub_cfgs,)
def _initialize_distributed(get_embedding_ranks, get_position_embedding_ranks):
"""Initialize torch.distributed and core model parallel."""
args = get_args()
device_count = torch.cuda.device_count()
if torch.distributed.is_initialized():
if args.rank == 0:
print(
"torch distributed is already initialized, "
"skipping initialization ...",
flush=True,
)
args.rank = torch.distributed.get_rank()
args.world_size = torch.distributed.get_world_size()
else:
if args.rank == 0:
print("> initializing torch distributed ...", flush=True)
# Manually set the device ids.
if device_count > 0:
#torch.cuda.set_device(args.local_rank)
#device_id = torch.device(f'cuda:{args.local_rank}')
device_id = args.rank % device_count
if args.local_rank is not None:
assert (
args.local_rank == device_id
), "expected local-rank to be the same as rank % device-count."
else:
args.local_rank = device_id
torch.cuda.set_device(device_id)
else:
device_id = None
# Call the init process
torch.distributed.init_process_group(
backend=args.distributed_backend,
world_size=args.world_size,
rank=args.rank,
init_method=args.dist_url,
timeout=timedelta(minutes=args.distributed_timeout_minutes),
)
#init_process_group_kwargs = {
# 'backend' : args.distributed_backend,
# 'world_size': args.world_size,
# 'rank': args.rank,
# 'timeout': timedelta(minutes=args.distributed_timeout_minutes),
#}
#torch.distributed.init_process_group(**init_process_group_kwargs)
# Set the tensor model-parallel, pipeline model-parallel, and
# data-parallel communicators.
if device_count > 0:
if mpu.model_parallel_is_initialized():
print("model parallel is already initialized")
else:
mpu.initialize_model_parallel(
args.tensor_model_parallel_size,
args.pipeline_model_parallel_size,
args.virtual_pipeline_model_parallel_size,
args.pipeline_model_parallel_split_rank,
context_parallel_size=args.context_parallel_size,
expert_model_parallel_size=args.expert_model_parallel_size,
distributed_timeout_minutes=args.distributed_timeout_minutes,
nccl_communicator_config_path=args.nccl_communicator_config_path,
order='tp-cp-ep-dp-pp' if not args.use_tp_pp_dp_mapping else 'tp-pp-dp',
encoder_tensor_model_parallel_size=args.encoder_tensor_model_parallel_size,
encoder_pipeline_model_parallel_size=args.encoder_pipeline_model_parallel_size,
get_embedding_ranks=get_embedding_ranks,
get_position_embedding_ranks=get_position_embedding_ranks,
)
if args.rank == 0:
print(
f"> initialized tensor model parallel with size "
f"{mpu.get_tensor_model_parallel_world_size()}"
)
print(
f"> initialized pipeline model parallel with size "
f"{mpu.get_pipeline_model_parallel_world_size()}"
)
def _init_autoresume():
"""Set autoresume start time."""
autoresume = get_adlr_autoresume()
if autoresume:
torch.distributed.barrier()
autoresume.init()
torch.distributed.barrier()
def _set_random_seed(seed_, data_parallel_random_init=False):
"""Set random seed for reproducability."""
if seed_ is not None and seed_ > 0:
# Ensure that different pipeline MP stages get different seeds.
seed = seed_ + (100 * mpu.get_pipeline_model_parallel_rank())
# Ensure different data parallel ranks get different seeds
if data_parallel_random_init:
seed = seed + (10 * mpu.get_data_parallel_rank())
random.seed(seed)
np.random.seed(seed)
torch.manual_seed(seed)
if torch.cuda.device_count() > 0:
tensor_parallel.model_parallel_cuda_manual_seed(seed)
else:
raise ValueError("Seed ({}) should be a positive integer.".format(seed))
def write_args_to_tensorboard():
"""Write arguments to tensorboard."""
args = get_args()
writer = get_tensorboard_writer()
if writer:
for arg in vars(args):
writer.add_text(arg, str(getattr(args, arg)), global_step=args.iteration)
def set_jit_fusion_options():
"""Set PyTorch JIT layer fusion options."""
# flags required to enable jit fusion kernels
TORCH_MAJOR = int(torch.__version__.split(".")[0])
TORCH_MINOR = int(torch.__version__.split(".")[1])
if (TORCH_MAJOR > 1) or (TORCH_MAJOR == 1 and TORCH_MINOR >= 10):
# nvfuser
torch._C._jit_set_profiling_executor(True)
torch._C._jit_set_profiling_mode(True)
torch._C._jit_override_can_fuse_on_cpu(False)
torch._C._jit_override_can_fuse_on_gpu(False)
torch._C._jit_set_texpr_fuser_enabled(False)
torch._C._jit_set_nvfuser_enabled(False) #True
torch._C._debug_set_autodiff_subgraph_inlining(False)
else:
# legacy pytorch fuser
torch._C._jit_set_profiling_mode(False)
torch._C._jit_set_profiling_executor(False)
torch._C._jit_override_can_fuse_on_cpu(True)
torch._C._jit_override_can_fuse_on_gpu(True)
_warmup_jit_function()
def _warmup_jit_function():
"""Compilie JIT functions before the main training steps"""
args = get_args()
if args.bf16:
dtype = torch.bfloat16
elif args.fp16:
dtype = torch.float16
else:
dtype = torch.float32
# Warmup fused bias+gelu
bias = torch.rand(
args.ffn_hidden_size // args.tensor_model_parallel_size,
dtype=dtype,
device="cuda",
)
input = torch.rand(
(
args.seq_length // args.context_parallel_size,
args.micro_batch_size,
args.ffn_hidden_size // args.tensor_model_parallel_size,
),
dtype=dtype,
device="cuda",
)
# Warmup JIT fusions with the input grad_enable state of both forward
# prop and recomputation
for bias_grad, input_grad in zip([True, True], [False, True]):
bias.requires_grad, input.requires_grad = bias_grad, input_grad
for _ in range(5):
if args.swiglu:
output = bias_swiglu(input, bias)
else:
output = bias_gelu(bias, input)
del bias, input, output
# Warmup fused bias+dropout+add
if args.sequence_parallel:
seq_length = args.seq_length // mpu.get_tensor_model_parallel_world_size()
else:
seq_length = args.seq_length
input = torch.rand(
(seq_length // args.context_parallel_size, args.micro_batch_size, args.hidden_size),
dtype=dtype,
device="cuda",
)
residual = torch.rand(
(seq_length // args.context_parallel_size, args.micro_batch_size, args.hidden_size),
dtype=dtype,
device="cuda",
)
bias = torch.rand((args.hidden_size), dtype=dtype, device="cuda").expand_as(
residual
)
dropout_rate = 0.1
# Warmup JIT fusions with the input grad_enable state of both forward
# prop and recomputation
for input_grad, bias_grad, residual_grad in zip(
[False, True], [True, True], [True, True]
):
input.requires_grad = input_grad
bias.requires_grad = bias_grad
residual.requires_grad = residual_grad
for _ in range(5):
output = bias_dropout_add_fused_train([input, bias], residual, dropout_rate)
del bias, input, residual, output
torch.cuda.empty_cache()
def setup_logging() -> None:
""" Sets the default logging level based on cmdline args and env vars.
Precedence:
1. Command line argument `--logging-level`
2. Env var `MEGATRON_LOGGING_LEVEL`
3. Default logging level (INFO)
Returns: None
"""
args = get_args()
logging_level = None
env_logging_level = os.getenv('MEGATRON_LOGGING_LEVEL', None)
if env_logging_level is not None:
logging_level = int(env_logging_level)
if args.logging_level is not None:
logging_level = args.logging_level
if logging_level is not None:
logger.info(f'Setting logging level to {logging_level}')
logging.getLogger().setLevel(logging_level)
......@@ -1703,6 +1703,7 @@ def _add_data_args(parser):
'GPTSentencePieceTokenizer',
'HuggingFaceTokenizer',
'Llama2Tokenizer',
'QwenTokenizer',
'TikTokenizer',
'NullTokenizer'],
help='What type of tokenizer to use.')
......
......@@ -15,7 +15,7 @@ from megatron.core.datasets.megatron_tokenizer import MegatronTokenizer
from .bert_tokenization import FullTokenizer as FullBertTokenizer
from .gpt2_tokenization import GPT2Tokenizer
from transformers import Qwen2Tokenizer
def build_tokenizer(args, **kwargs):
"""Initialize tokenizer."""
......@@ -49,6 +49,8 @@ def build_tokenizer(args, **kwargs):
elif args.tokenizer_type == 'Llama2Tokenizer':
assert args.tokenizer_model is not None
tokenizer = _Llama2Tokenizer(args.tokenizer_model)
elif args.tokenizer_type == 'QwenTokenizer':
tokenizer = _Qwen2Tokenizer(args.vocab_file, args.merge_file)
elif args.tokenizer_type == 'TikTokenizer':
assert args.tokenizer_model is not None
assert args.tiktoken_pattern is not None
......@@ -132,6 +134,43 @@ class _HuggingFaceTokenizer(MegatronTokenizer):
def eod(self):
return self._tokenizer.eos_token_id
class _Qwen2Tokenizer(MegatronTokenizer):
def __init__(self, vocab_file, merge_file,extra_vocab_size=0):
super().__init__(vocab_file, merge_file)
self.tokenizer = Qwen2Tokenizer(vocab_file, merge_file)
self.extra_vocab_size = extra_vocab_size
self.tokenizer.add_special_tokens(special_tokens_dict=dict(pad_token="<|extra_0|>"))
@property
def vocab_size(self):
return len(self.tokenizer.encoder) + self.extra_vocab_size
@property
def vocab(self):
return self.tokenizer.encoder
@property
def inv_vocab(self):
return self.tokenizer.decoder
def tokenize(self, text):
return self.tokenizer.encode(text)
def detokenize(self, token_ids):
return self.tokenizer.decode(token_ids)
@property
def eod(self):
return self.tokenizer.eos_token_id
@property
def eos_token(self):
return self.tokenizer.eos_token
@property
def pad_token_id(self):
return self.tokenizer.pad_token_id
class _BertWordPieceTokenizer(MegatronTokenizer):
"""Original BERT wordpiece tokenizer."""
......
#!/bin/bash
# Runs the "7B" parameter model
export HSA_FORCE_FINE_GRAIN_PCIE=1
export OMP_NUM_THREADS=1
export NCCL_P2P_LEVEL=SYS
export NCCL_ALGO=Ring
export NCCL_NCHANNELS_PER_PEER=16
export NCCL_MIN_NCHANNELS=20
export NCCL_IB_TIMEOUT=22
export CUDA_DEVICE_MAX_CONNECTIONS=1
export NCCL_IB_HCA=xx #based on your environment
export NCCL_NET_GDR_LEVEL=SYS
export NCCL_NET_GDR_READ=0
source /opt/dtk/env.sh
lrank=$OMPI_COMM_WORLD_LOCAL_RANK
RANK=$OMPI_COMM_WORLD_RANK
WORLD_SIZE=$OMPI_COMM_WORLD_SIZE
CHECKPOINT_PATH=./tmp #$1 #<Specify path>
TENSORBOARD_LOGS_PATH=./tmp #$2 #<Specify path>
DATA_PATH="/path_to_my-qwen_text_document" #<Specify path and file prefix>_text_document
GPT_MODEL_ARGS=(
--num-layers 28
--hidden-size 3584
--ffn-hidden-size 18944
--num-attention-heads 28
--seq-length 4096
--max-position-embeddings 32768
--num-query-groups 4
--group-query-attention
)
TRAINING_ARGS=(
--log-throughput
--transformer-impl local
--use-legacy-models
--micro-batch-size 1
--global-batch-size 12 #512
--train-iters 100
--weight-decay 0.1
--adam-beta1 0.9
--adam-beta2 0.95
--init-method-std 0.006
--clip-grad 1.0
--bf16
--use-distributed-optimizer
--use-flash-attn-triton
--disable-bias-linear
--attention-dropout 0
--hidden-dropout 0
--no-gradient-accumulation-fusion
--add-qkv-bias
--swiglu
--lr 3.0e-5
--lr-decay-style cosine
--min-lr 3.0e-6
--lr-warmup-iters 1
--use-fast-cross-entropy-loss
)
MODEL_PARALLEL_ARGS=(
--sequence-parallel
--tensor-model-parallel-size 2
--pipeline-model-parallel-size 4
)
DATA_ARGS=(
--data-path $DATA_PATH
--split 949,50,1
--untie-embeddings-and-output-weights
--use-rotary-position-embeddings
--normalization RMSNorm
--no-position-embedding
--tokenizer-type QwenTokenizer
--merge-file /path_to_qwen_token/merges.txt
--vocab-file /path_to_qwen_token/vocab.json
)
EVAL_AND_LOGGING_ARGS=(
--log-interval 1
--save-interval 1000
--eval-interval 1000
--save $CHECKPOINT_PATH
--load $CHECKPOINT_PATH
--eval-iters 10
--tensorboard-dir $TENSORBOARD_LOGS_PATH
)
APP="python3 -u pretrain_gpt.py \
${GPT_MODEL_ARGS[@]} \
${TRAINING_ARGS[@]} \
${MODEL_PARALLEL_ARGS[@]} \
${DATA_ARGS[@]} \
${EVAL_AND_LOGGING_ARGS[@]}
--rank ${RANK} \
--world_size ${WORLD_SIZE} \
--dist_url tcp://${1}:34566 \
"
#for hygon
case ${lrank} in
[0])
export CUDA_VISIBLE_DEVICES=0,1,2,3,4,5,6,7
numactl --cpunodebind=0 --membind=0 ${APP}
;;
[1])
export CUDA_VISIBLE_DEVICES=0,1,2,3,4,5,6,7
numactl --cpunodebind=1 --membind=1 ${APP}
;;
[2])
export CUDA_VISIBLE_DEVICES=0,1,2,3,4,5,6,7
numactl --cpunodebind=2 --membind=2 ${APP}
;;
[3])
export CUDA_VISIBLE_DEVICES=0,1,2,3,4,5,6,7
numactl --cpunodebind=3 --membind=3 ${APP}
;;
[4])
export CUDA_VISIBLE_DEVICES=0,1,2,3,4,5,6,7
numactl --cpunodebind=4 --membind=4 ${APP}
;;
[5])
export CUDA_VISIBLE_DEVICES=0,1,2,3,4,5,6,7
numactl --cpunodebind=5 --membind=5 ${APP}
;;
[6])
export CUDA_VISIBLE_DEVICES=0,1,2,3,4,5,6,7
numactl --cpunodebind=6 --membind=6 ${APP}
;;
[7])
export CUDA_VISIBLE_DEVICES=0,1,2,3,4,5,6,7
numactl --cpunodebind=7 --membind=7 ${APP}
;;
esac
This source diff could not be displayed because it is too large. You can view the blob instead.
This source diff could not be displayed because it is too large. You can view the blob instead.
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment