Commit deb8370c authored by hepj's avatar hepj
Browse files

Initial commit

parents
Pipeline #2198 canceled with stages
#!/bin/bash
# Runs the "307M" parameter Retro model.
export CUDA_DEVICE_MAX_CONNECTIONS=1
GPUS_PER_NODE=8
# Change for multinode config
MASTER_ADDR=localhost
MASTER_PORT=6000
NUM_NODES=1
NODE_RANK=0
WORLD_SIZE=$(($GPUS_PER_NODE*$NUM_NODES))
CHECKPOINT_PATH=$1 #<Specify path>
TENSORBOARD_LOGS_PATH=$2 #<Specify path>
DISTRIBUTED_ARGS=(
--nproc_per_node $GPUS_PER_NODE
--nnodes $NUM_NODES
--master_addr $MASTER_ADDR
--master_port $MASTER_PORT
)
######## GPT or Retro? ########
# 0 : GPT.
# 1 : Retro
ADD_RETRIEVER=1
######## Megatron, Retro dirs. ########
RETRO_PROJECT_DIR="<path/to/retro/project/directory>"
######## Model, training args. ########
# ** Note: --seq-length auto loaded from Retro project dir.
RETRO_MODEL_ARGS=(
--num-layers 32
--hidden-size 2048
--num-attention-heads 32
)
# ** Note: --data-path, --tokenizer-type, and --tokenizer-model auto loaded from Retro project dir.
DATA_ARGS=(
--split 98,2,0
)
MODEL_PARALLEL_ARGS=(
--tensor-model-parallel-size 8
--pipeline-model-parallel-size 1
)
# ** Note: --eval-interval, --eval-iters auto loaded from Retro project dir.
EVAL_AND_LOGGING_ARGS=(
--log-interval 100
--save-interval 10000
--eval-interval 1000
--save $CHECKPOINT_PATH
--load $CHECKPOINT_PATH
--eval-iters 10
--tensorboard-dir $TENSORBOARD_LOGS_PATH
)
TRAINING_ARGS=" \
--retro-project-dir ${RETRO_PROJECT_DIR} \
--transformer-impl transformer_engine \
--num-workers 8 \
--micro-batch-size 4 \
--lr-decay-samples 166400000 \
--lr-warmup-samples 162761 \
--lr 6.0e-4 \
--min-lr 6.0e-5 \
--lr-decay-style cosine \
--clip-grad 1.0 \
--weight-decay 0.1 \
--adam-beta1 0.9 \
--adam-beta2 0.95 \
--init-method-std 0.023 \
--log-params-norm \
--log-num-zeros-in-grad \
--bf16 \
--no-data-sharding \
"
if [ "$ADD_RETRIEVER" = "1" ]; then
TRAINING_ARGS+=" --retro-add-retriever"
fi
######## Command. ########
torchrun ${DISTRIBUTED_ARGS[@]} pretrain_retro.py \
${RETRO_MODEL_ARGS[@]} \
${TRAINING_ARGS} \
${MODEL_PARALLEL_ARGS[@]} \
${DATA_ARGS[@]} \
${EVAL_AND_LOGGING_ARGS[@]}
import os
import torch
from torch.optim import Adam
from torch.utils.data import DataLoader
from functools import partial
from pathlib import Path
from megatron.core import parallel_state
from megatron.core import dist_checkpointing
from megatron.core.pipeline_parallel.schedules import get_forward_backward_func
from megatron.core.tensor_parallel.random import model_parallel_cuda_manual_seed
from megatron.core.transformer.transformer_config import TransformerConfig
from megatron.core.models.gpt.gpt_model import GPTModel
from megatron.core.models.gpt.gpt_layer_specs import get_gpt_layer_local_spec
from megatron.core.datasets.utils import compile_helpers
from megatron.core.datasets.blended_megatron_dataset_builder import BlendedMegatronDatasetBuilder
from megatron.core.datasets.gpt_dataset import GPTDatasetConfig, MockGPTDataset
from megatron.training.tokenizer.tokenizer import _NullTokenizer
_SEQUENCE_LENGTH = 64
def initialize_distributed(tensor_model_parallel_size=1, pipeline_model_parallel_size=1):
parallel_state.destroy_model_parallel()
# Torch setup for distributed training
rank = int(os.environ['LOCAL_RANK'])
world_size = torch.cuda.device_count()
torch.cuda.set_device(rank)
torch.distributed.init_process_group(world_size=world_size, rank=rank)
# Megatron core distributed training initialization
parallel_state.initialize_model_parallel(tensor_model_parallel_size, pipeline_model_parallel_size)
def model_provider():
"""Build the model."""
transformer_config = TransformerConfig(
num_layers=2,
hidden_size=12,
num_attention_heads=4,
use_cpu_initialization=True,
pipeline_dtype=torch.float32,
)
gpt_model = GPTModel(
config=transformer_config,
transformer_layer_spec=get_gpt_layer_local_spec(),
vocab_size=100,
max_sequence_length=_SEQUENCE_LENGTH,
)
return gpt_model
def get_train_data_iterator():
if torch.distributed.is_available() and torch.distributed.is_initialized():
if torch.distributed.get_rank() == 0:
compile_helpers()
torch.distributed.barrier()
else:
compile_helpers()
config = GPTDatasetConfig(
random_seed=0,
sequence_length=_SEQUENCE_LENGTH,
reset_position_ids=False,
reset_attention_mask=False,
eod_mask_loss=False,
tokenizer=_NullTokenizer(vocab_size=_SEQUENCE_LENGTH),
)
datasets = BlendedMegatronDatasetBuilder(
MockGPTDataset, [1000, None, None], lambda: True, config
).build()
train_dataloader = DataLoader(datasets[0], batch_size=8, shuffle=True)
train_iterator = iter(train_dataloader)
return train_iterator
def forward_step_func(data_iterator, model):
def loss_func(loss_mask: torch.Tensor, output_tensor: torch.Tensor):
losses = output_tensor.float()
loss_mask = loss_mask.view(-1).float()
loss = torch.sum(losses.view(-1) * loss_mask) / loss_mask.sum()
# If you have data parallel reduce loss across data parallel groups.
# If pipeline parallel, loss computation is done only in last stage.
return loss, {'lm loss': loss}
data = next(data_iterator)
tokens = data['tokens'].to(device)
attention_mask = data['attention_mask'].to(device)
position_ids = data['position_ids'].to(device)
labels = data['labels'].to(device)
loss_mask = data['loss_mask'].to(device)
output_tensor = model(tokens, position_ids, attention_mask,
labels=labels)
return output_tensor, partial(loss_func, loss_mask)
def save_distributed_checkpoint(checkpoint_path, gpt_model):
sharded_state_dict = gpt_model.sharded_state_dict(prefix='')
dist_checkpointing.save(sharded_state_dict=sharded_state_dict, checkpoint_dir=checkpoint_path)
def load_distributed_checkpoint(checkpoint_path, gpt_model):
sharded_state_dict=gpt_model.sharded_state_dict(prefix='')
checkpoint = dist_checkpointing.load(sharded_state_dict=sharded_state_dict, checkpoint_dir=checkpoint_path)
gpt_model.load_state_dict(checkpoint)
return gpt_model
if __name__ == "__main__":
initialize_distributed(tensor_model_parallel_size=2, pipeline_model_parallel_size=1)
model_parallel_cuda_manual_seed(123)
gpt_model = model_provider()
device = torch.device("cuda")
gpt_model.to(device)
optim = Adam(gpt_model.parameters())
train_iterator = get_train_data_iterator()
forward_backward_func = get_forward_backward_func()
# Running the model for 5 iterations
for _ in range(5):
optim.zero_grad()
losses_reduced = forward_backward_func(
forward_step_func=forward_step_func,
data_iterator=train_iterator,
model=gpt_model,
num_microbatches=1,
seq_length=_SEQUENCE_LENGTH,
micro_batch_size=8,
decoder_seq_length=_SEQUENCE_LENGTH,
forward_only=False)
optim.step()
print(f'Losses reduced : {losses_reduced}')
# Saving the model
ckpt_path = os.getcwd() + '/ckpt'
Path(ckpt_path).mkdir(exist_ok=True)
save_distributed_checkpoint(gpt_model=gpt_model, checkpoint_path=ckpt_path)
# Loading the model
gpt_model = load_distributed_checkpoint(gpt_model=gpt_model, checkpoint_path=ckpt_path)
gpt_model.to(device)
print('Successfully loaded the model')
# T5 MODEL
## Table of contents
- [1. Training Setup](#1-training-setup)
- [2. Configurations](#2-configurations)
- [3. Training Results](#3-training-results)
## 1. Training setup
<a id="markdown-training-setup" name="training-setup"></a>
To run the model on a Slurm based cluster
```
PYTORCH_IMAGE=nvcr.io/nvidia/pytorch:23.09-py3
ACCOUNT_NAME=""
PARTITION=""
JOB_NAME=""
NUM_NODES=1
CHECKPOINT_PATH="" #<Specify path to checkpoint>
TENSORBOARD_LOGS_PATH=""#<Specify path to tensorboard log>
VOCAB_FILE="" #<Specify path to file>/bert-large-cased-vocab.txt
DATA_PATH="" #<Specify path and file prefix>_text_document
srun -N $NUM_NODES --container-image $PYTORCH_IMAGE --container-mounts "/path/to/data:/path/to/data,/path/to/megatron-lm:/workspace/megatron-lm" --account $ACCOUNT -N 1 -J $JOB_NAME -p $PARTITION --no-container-mount-home -c "
cd /workspace/megatron-lm
./examples/t5/train_t5_220m_distributed.sh $CHECKPOINT_PATH $TENSORBOARD_LOGS_PATH $VOCAB_FILE $DATA_PATH"
```
## 2. Configurations
<a id="markdown-configurations" name="configurations"></a>
The architecture arguments below shows configuration for T5 220M model.
### 220M
```
--num-layers 12 \
--hidden-size 768 \
--num-attention-heads 12 \
--kv-channels 64 \
--ffn-hidden-size 3072 \
--encoder-seq-length 512 \
--decoder-seq-length 128 \
--max-position-embeddings 512 \
--tensor-model-parallel-size 1 \
--pipeline-model-parallel-size 1 \
```
## 3. Training Results
<a id="markdown-training-results" name="training-results"></a>
Below is the training curve for the 220M model on Pile dataset. The training takes 4 days on 32 GPUs, with batch size of 2048.
Finetuning on SQUAD dataset, the validation result is: 63.44\%
<p align="center">
<img src="./t5_mcore_train_curve.png" width="800" height="400">
</p>
#!/bin/bash
# Runs the "220M" parameter model
export CUDA_DEVICE_MAX_CONNECTIONS=1
GPUS_PER_NODE=8
# Change for multinode config
MASTER_ADDR=localhost
MASTER_PORT=6000
NUM_NODES=1
NODE_RANK=0
WORLD_SIZE=$(($GPUS_PER_NODE*$NUM_NODES))
CHECKPOINT_PATH=$1 #<Specify path>
TENSORBOARD_DIR=$2 #<Specify path>
VOCAB_FILE=$3 #<Specify path to file>/bert-large-cased-vocab.txt
DATA_PATH=$4 #<Specify path and file prefix>_text_document
DISTRIBUTED_ARGS="
--nproc_per_node $GPUS_PER_NODE \
--nnodes $NUM_NODES \
--node_rank $NODE_RANK \
--master_addr $MASTER_ADDR \
--master_port $MASTER_PORT
"
T5_ARGS="
--encoder-num-layers 12 \
--decoder-num-layers 12 \
--hidden-size 768 \
--num-attention-heads 12 \
--kv-channels 64 \
--ffn-hidden-size 3072 \
--encoder-seq-length 512 \
--decoder-seq-length 128 \
--max-position-embeddings 512 \
--micro-batch-size 64 \
--global-batch-size 512 \
--lr 0.0001 \
--train-iters 1000000 \
--lr-decay-iters 1000000 \
--lr-decay-style linear \
--min-lr 0.00001 \
--weight-decay 1e-2 \
--lr-warmup-fraction .01 \
--clip-grad 1.0 \
--bf16 \
--vocab-extra-ids 100 \
--init-method-std 0.015 \
--transformer-impl transformer_engine \
--tensor-model-parallel-size 1 \
--pipeline-model-parallel-size 1 \
"
DATA_ARGS="
--data-path $DATA_PATH \
--vocab-file $VOCAB_FILE \
--tokenizer-type BertWordPieceCase \
--split 99982,9,9 \
"
OUTPUT_ARGS="
--log-interval 100 \
--tensorboard-dir ${TENSORBOARD_DIR} \
--save-interval 500 \
--eval-interval 1000 \
--eval-iters 10
"
torchrun $DISTRIBUTED_ARGS pretrain_t5.py \
$T5_ARGS \
$DATA_ARGS \
$OUTPUT_ARGS \
--distributed-backend nccl \
--save $CHECKPOINT_PATH \
--load $CHECKPOINT_PATH \
.jet_common:
stage: functional_tests
rules:
- if: '$FUNCTIONAL_TEST == "yes" && $CI_PIPELINE_SOURCE == "merge_request_event" && ($CI_MERGE_REQUEST_TARGET_BRANCH_NAME != $CI_DEFAULT_BRANCH && $CI_MERGE_REQUEST_TARGET_BRANCH_NAME !~ /^core_r/ )'
allow_failure: true
- if: '$FUNCTIONAL_TEST == "yes"'
- when: never
default:
id_tokens:
VAULT_JWT_TOKEN:
aud: https://stg.vault.nvidia.com
include:
- project: dl/jet/gitlab-templates
ref: main
file: downstreams.yml
jet-configure:
image:
name: mikefarah/yq:4.35.2
entrypoint: [""]
extends: [.jet_common, .jet-configure]
tags:
- os/linux
script:
- set -x
- JET_FILTER=${JET_CUSTOM_FILTER:-False}
- echo "_JET_FILTER=$JET_FILTER" | tee -a jet.env
- |
IMAGE=${CI_MCORE_IMAGE}:${CI_PIPELINE_ID} yq '. |=
(
select(.spec.name == "mcore-pyt")
| .spec.source.image = env(IMAGE)
)
' -i tests/functional_tests/jet_recipes/build-pyt.yaml
IMAGE=${CI_NEMO_IMAGE}:${CI_PIPELINE_ID} yq '. |=
(
select(.spec.name == "mcore-nemo")
| .spec.source.image = env(IMAGE)
)
' -i tests/functional_tests/jet_recipes/build-pyt.yaml
artifacts:
reports:
dotenv: jet.env
paths:
- tests/functional_tests/jet_recipes
retry:
max: 2
when: job_execution_timeout
jet-trigger:
extends: [.jet_common, .jet-trigger]
needs: [metadata, jet-configure]
trigger:
project: dl/jet/ci
branch: $JET_CI_BRANCH
strategy: depend
variables:
JET_WORKLOADS_FILTER: '$_JET_FILTER'
inherit:
variables: true
jet-results-summary:
extends: [.jet_common]
image: gitlab-master.nvidia.com:5005/dl/jet/api:latest
tags:
- os/linux
before_script:
- jet secrets jwt-login jwt/nvidia/gitlab-master adlr-megatron-lm-ci $VAULT_JWT_TOKEN
script:
- env
- RW_API_TOKEN=${PROJECT_ACCESS_TOKEN} ENDPOINT=${PROJECT_ENDPOINT} bash tests/functional_tests/shell_test_utils/restart_jet_log_jobs.sh ${CI_PIPELINE_ID}
- python -m pip install -U --no-cache-dir prettytable
- rc=0
- python tests/functional_tests/python_test_utils/jet_test_pipeline.py ${CI_PIPELINE_ID} --artifact_links $CI_JOB_ID --download_scripts_dir ./scripts || rc=$?
- exit $rc
artifacts:
when: always
paths:
- scripts
allow_failure: true
## Quick Start
The following guide will show you how to quickly get started with Megatron Core. It will show you the following
* We will initalize megatron core on 2 GPUS.
* We will build a GPT model with tensor model parallel size 2, pipeline parallel size 1
* We will train it for a few iterations using megatron core schedules
* We will save the model using the distributed checkpointing format
* We will load the model saved above.
*NOTE: The following has been testing for megatron core version 0.8.0 and NGC Pytorch Container version 24.02
### Environment Setup
```
docker run --ipc=host --shm-size=512m --gpus 2 -it nvcr.io/nvidia/pytorch:24.02-py3
git clone https://github.com/NVIDIA/Megatron-LM.git && cd Megatron-LM
```
<br>
### Writing Your First Training Loop
The following steps will walk you through how you can create a sample GPT model split across tensors (Tensor model parallel ) on 2 GPUS, and run a forward pass through it using a MockGPT dataset helper class that we created in Megatron core.
<br>
**NOTE: All of the following steps are already put into a script [run_simple_mcore_train_loop.py](https://github.com/NVIDIA/Megatron-LM/tree/main/examples/run_simple_mcore_train_loop.py) which you can run as follows**
```
PYTHONPATH=$PYTHON_PATH:./megatron torchrun --nproc-per-node 2 examples/run_simple_mcore_train_loop.py
```
<br>
**STEP 1 - Initialize Distributed Training and Model parallel setup**
The following utility when called initalizes your distributed setup.
```python
import os
import torch
from megatron.core import parallel_state
def initialize_distributed(tensor_model_parallel_size = 1, pipeline_model_parallel_size = 1):
# Torch setup for distributed training
rank = int(os.environ['LOCAL_RANK'])
world_size = torch.cuda.device_count()
torch.cuda.set_device(rank)
torch.distributed.init_process_group(world_size=world_size, rank=rank)
# Megatron core distributed training initialization
parallel_state.initialize_model_parallel(tensor_model_parallel_size, pipeline_model_parallel_size)
```
<br>
**STEP 2 - GPT Model Setup**
The following step shows you how you can quickly create a GPT model. For a list of other configs that you can pass into the model look into [transformer_config.py](https://github.com/NVIDIA/Megatron-LM/tree/main/megatron/core/transformer/transformer_config.py)
```
from megatron.core.transformer.transformer_config import TransformerConfig
from megatron.core.models.gpt.gpt_model import GPTModel
from megatron.core.models.gpt.gpt_layer_specs import get_gpt_layer_local_spec
def model_provider():
"""Build the model."""
transformer_config = TransformerConfig(
num_layers=2,
hidden_size=12,
num_attention_heads=4,
use_cpu_initialization=True,
pipeline_dtype=torch.float32)
gpt_model = GPTModel(
config=transformer_config,
transformer_layer_spec=get_gpt_layer_local_spec(),
vocab_size=100,
max_sequence_length=64)
return gpt_model
```
<br>
**STEP 3 - GPT Mock dataset setup**
The following shows you how you can quickly get started with a mock dataset utility we created. In order to train with your data, please use the actual GPTDataset class in [gpt_dataset.py](https://github.com/NVIDIA/Megatron-LM/tree/main/megatron/core/datasets/gpt_dataset.py)
To find more information about megatron core data pipeline please refer to [this](https://github.com/NVIDIA/Megatron-LM/tree/main/megatron/core/datasets/readme.md?ref_type=heads)
```
import torch
from torch.utils.data import DataLoader
from megatron.core.datasets.blended_megatron_dataset_builder import BlendedMegatronDatasetBuilder
from megatron.core.datasets.gpt_dataset import GPTDatasetConfig, MockGPTDataset
from megatron.training.tokenizer.tokenizer import _NullTokenizer
from megatron.core.datasets.utils import compile_helpers
_SEQUENCE_LENGTH = 64
def get_train_data_iterator():
if torch.distributed.is_available() and torch.distributed.is_initialized():
if torch.distributed.get_rank() == 0:
compile_helpers()
torch.distributed.barrier()
else:
compile_helpers()
config = GPTDatasetConfig(
random_seed=0,
sequence_length=_SEQUENCE_LENGTH,
reset_position_ids=False,
reset_attention_mask=False,
eod_mask_loss=False,
tokenizer=_NullTokenizer(vocab_size=_SEQUENCE_LENGTH),
)
datasets = BlendedMegatronDatasetBuilder(
MockGPTDataset, [1000, None, None], lambda: True, config
).build()
train_dataloader = DataLoader(datasets[0], batch_size=8, shuffle=True)
train_iterator = iter(train_dataloader)
return train_iterator
```
<br>
**STEP 4 - Forward Step Function**
In megatron core, we use [schedules.py](https://github.com/NVIDIA/Megatron-LM/tree/main/megatron/core/pipeline_parallel/schedules.py) to run the model. So it is sufficient to define a forward step function which takes as input the data iterator and the model and produces as output the output tensor and a loss function
```python
from functools import partial
def forward_step_func(data_iterator, model):
def loss_func(loss_mask: torch.Tensor, output_tensor: torch.Tensor):
losses = output_tensor.float()
loss_mask = loss_mask.view(-1).float()
loss = torch.sum(losses.view(-1) * loss_mask) / loss_mask.sum()
# If you have data parallel reduce loss across data parallel groups.
# If pipeline parallel, loss computation is done only in last stage.
return loss, {'lm loss': loss}
data = next(data_iterator)
tokens = data['tokens'].to(device)
attention_mask = data['attention_mask'].to(device)
position_ids = data['position_ids'].to(device)
labels = data['labels'].to(device)
loss_mask = data['loss_mask'].to(device)
output_tensor = model(tokens, position_ids, attention_mask,
labels=labels)
return output_tensor, partial(loss_func, loss_mask)
```
<br>
**STEP 5 - Load and Save Distributed Checkpoint**
Megatron core uses distributed checkpoint for loading and saving model. This gives you the flexiblity to convert model from one model parallel setting to another when you load a model (i.e A model trained with tensor parallel size 2, can now be loaded as tensor model parallel size 4 etc.)
```python
from megatron.core import dist_checkpointing
def save_distributed_checkpoint(checkpoint_path, gpt_model):
sharded_state_dict = gpt_model.sharded_state_dict(prefix='')
dist_checkpointing.save(sharded_state_dict=sharded_state_dict, checkpoint_dir=checkpoint_path)
def load_distributed_checkpoint(checkpoint_path, gpt_model):
sharded_state_dict=gpt_model.sharded_state_dict(prefix='')
checkpoint = dist_checkpointing.load(sharded_state_dict=sharded_state_dict, checkpoint_dir=checkpoint_path)
gpt_model.load_state_dict(checkpoint)
return gpt_model
```
<br>
**STEP 6 - Main Function**
The following is the main function that needs to go into your script.
```python
from pathlib import Path
from torch.optim import Adam
from megatron.core.pipeline_parallel.schedules import get_forward_backward_func
from megatron.core.tensor_parallel.random import model_parallel_cuda_manual_seed
if __name__ == "__main__":
initialize_distributed(tensor_model_parallel_size=2, pipeline_model_parallel_size=1)
model_parallel_cuda_manual_seed(123)
gpt_model = model_provider()
device = torch.device("cuda")
gpt_model.to(device)
optim = Adam(gpt_model.parameters())
train_iterator = get_train_data_iterator()
forward_backward_func = get_forward_backward_func()
# Running the model for 5 iterations
for _ in range(5):
optim.zero_grad()
losses_reduced = forward_backward_func(
forward_step_func=forward_step_func,
data_iterator=train_iterator,
model=gpt_model,
num_microbatches=1,
seq_length=64,
micro_batch_size=8,
decoder_seq_length=64,
forward_only=False)
optim.step()
print(f'Losses reduced : {losses_reduced}')
# Saving the model
save_distributed_checkpoint(gpt_model=gpt_model, checkpoint_path='/workspace/ckpt')
# Loading the model
gpt_model = load_distributed_checkpoint(gpt_model=gpt_model, checkpoint_path='/workspace/ckpt')
gpt_model.to(device)
print('Successfully loaded the model')
```
<br>
### Extending Further
The above example introduced you to a basic training loop in MCore. To see more advanced examples please look at [pretrain_gpt.py]. That will show you how you can write more complex training loops, involving pipeline parallel, context parallel, rope embeddings, mixture of experts and all other functionalities present in mcore.
# Megatron-Core
Megatron-Core is an open-source PyTorch-based library that contains GPU-optimized techniques and cutting-edge system-level optimizations. It abstracts them into composable and modular APIs, allowing full flexibility for developers and model researchers to train custom transformers at-scale on NVIDIA accelerated computing infrastructure. This library is compatible with all NVIDIA Tensor Core GPUs, including FP8 acceleration support for [NVIDIA Hopper architectures](https://www.nvidia.com/en-us/data-center/technologies/hopper-architecture/).
Megatron-Core offers core building blocks such as attention mechanisms, transformer blocks and layers, normalization layers, and embedding techniques. Additional functionality like activation recomputation, distributed checkpointing is also natively built-in to the library. The building blocks and functionality are all GPU optimized, and can be built with advanced parallelization strategies for optimal training speed and stability on NVIDIA Accelerated Computing Infrastructure. Another key component of the Megatron-Core library includes advanced model parallelism techniques (tensor, sequence, pipeline, context, and MoE expert parallelism).
Megatron-Core can be used with [NVIDIA NeMo](https://www.nvidia.com/en-us/ai-data-science/products/nemo/), an enterprise-grade AI platform. Alternatively, you can explore Megatron-Core with the native PyTorch training loop [here](https://github.com/NVIDIA/Megatron-LM/tree/main/examples). Visit [Megatron-Core documentation](https://docs.nvidia.com/megatron-core/developer-guide/latest/index.html) to learn more.
## Quick links
- [Benchmark using NVIDIA NeMo](https://docs.nvidia.com/nemo-framework/user-guide/latest/overview.html#performance-benchmarks)
- [Multimodal example (LLaVA training pipeline)](https://github.com/NVIDIA/Megatron-LM/tree/main/examples/multimodal)
- [Mixture-of-Experts](https://github.com/NVIDIA/Megatron-LM/tree/main/megatron/core/transformer/moe)
- [Training Mamba-based Language Models](https://github.com/NVIDIA/Megatron-LM/tree/main/examples/mamba)
\ No newline at end of file
## StragglerDetector for a TP Group
The file `megatron/core/utils.py` has a class named `StragglerDetector` which supports Python Contexts.
It can be used to find straggling TP group based on the RTT of the ranks in the TP Group. It also collects
Power/Temp/Utilization for GPUs, which can additionally be used to narrow down to the exact GPU in the TP Group,
assuming the straggling was caused by hardware anomaly in a given GPU.<br>
This class supports collecting timing events for various steps of a given iteration. It
keeps collecting such timing events on a per rank basis, and when the reporter is invoked
during a logging interval, it computes the min and max of certain metric across all
ranks and logs the observed metric and the rank as follows
```
0: INFO:megatron.core.utils:[2024-03-14 23:07:56] | MnRtt/Rnk: 3453.08ms/8 | MxRtt/Rnk: 3468.20ms/0 | MnPwr/Rnk: 601796W/8 | MxPwr/Rnk: 683801W/18 | MnTmp/Rnk: 52C/0 | MxTmp/Rnk: 65C/21 | MnUtl/Rnk: 97%/8 | MxUtl/Rnk: 100%/6 | MnClk/Rnk: 1950MHz/28 | MxClk/Rnk: 1980MHz/0 | MnDRtt/Rnk: 14.27ms/23 | MxDRtt/Rnk: 34.65ms/3 | MnEtpt/Rnk: 296.02TF/0 | MxEtpt/Rnk: 297.32TF/8
```
<hr>
### Description of the metrics
Each metric is prefixed with `Mn` or `Mx` to represent `Minimum` or `Maximum`. Each metric is also suffixed with the rank where the metric was measured. The metrics are averaged over the logging interval. Between the prefix and the rank is the name of the metric as follows
- Rtt : RoundTrip Time (time spent in all the traced ops per iteration)
- Pwr : GPU Power
- Tmp : GPU Temperature
- Utl : GPU Utilization
- Clk : GPU Clock
- DRtt: get_batch latency
- Etpt: Estimated throughput. This is derived from actual computed throughput dividied by Rtt. Since we do not collect timing for backward pass, the value is further divided by three to come up with estimated throughput.
<hr>
### Command Line activation
To start using the StragglerDetector, need to pass the following argument `--log-straggler`. It optionally also takes two additional parameters. Default disabled
- `--disable-straggler-on-startup` - whether to keept the StragglerDetector disabled on startup and enable later. Default enabled
- `--straggler-ctrlr-port` - The StragglerDetector can toggle between on/off just by sending `curl Rank0Host:port`. Default port is 65535. Every time it is turned
- `--straggler-minmax-count` - If set to > 1 (N), it prints N Top and Bottom Etpt/Rank pairs as shown below
```
0: INFO:megatron.core.utils:^^^^ Bottom 4 Ranks with lowest Etpt(TF): 296.02/0, 296.17/2, 296.23/1, 296.23/4,
0: INFO:megatron.core.utils:^^^^ Top 4 Ranks with highest Etpt(TF): 297.28/15, 297.28/11, 297.32/12, 297.32/8,
```
<hr>
### Programming the StragglerDetector
The StragglerDetector class supports context, and its implementation is a Singleton.
- Initialization
```
# initialization, where StragglerDetector will be used
from megatron.core.utils import StragglerDetector
stimer = StragglerDetector()
```
- One time for each rank
```
# one time before the training loop starts
stimer.configure(world, rank, enabled=True, port=65545)
# Arguments to configure
# world : World Size
# rank : The rank of this trainer
# mmcnt : (Optional) Number of ranks to print for showing Min/Max Etpt
# amp : (Optional) Set to 3.0 if we only use timers in fwd pass
# port : (Optional) control port, useful only for rank-0
# prefill : (Optional) howmany Events to pre-populate
# enabled : (Optional) whether or not collection is enabled on startup
```
- To Capture time
```
# whereever timing need to be captured
with stimer:
do_operation()
# special case for get_batch
with stimer(bdata=True):
input,... = get_batch(iterator,...)
```
- Logging in main training loop
```
# logging
total_flops = 0.0
iteration = 0
# inside the main training loop
while training:
iteration += 1
do_step()
total_flops += get_computed_flops()
if iteration % log_interval:
stimer.report(total_flops, log_interval)
total_flops = 0.0
```
import megatron.core.tensor_parallel
import megatron.core.utils
from megatron.core import parallel_state
from megatron.core.distributed import DistributedDataParallel
from megatron.core.inference_params import InferenceParams
from megatron.core.model_parallel_config import ModelParallelConfig
from megatron.core.num_microbatches_calculator import init_num_microbatches_calculator
from megatron.core.package_info import (
__contact_emails__,
__contact_names__,
__description__,
__download_url__,
__homepage__,
__keywords__,
__license__,
__package_name__,
__repository_url__,
__shortversion__,
__version__,
)
from megatron.core.timers import Timers
# Alias parallel_state as mpu, its legacy name
mpu = parallel_state
__all__ = [
"parallel_state",
"tensor_parallel",
"utils",
"DistributedDataParallel",
"InferenceParams",
"init_num_microbatches_calculator",
"ModelParallelConfig",
"Timers",
]
CXXFLAGS += -O3 -Wall -shared -std=c++11 -fPIC -fdiagnostics-color
CPPFLAGS += $(shell python3 -m pybind11 --includes)
LIBNAME = helpers
LIBEXT = $(shell python3-config --extension-suffix)
default: $(LIBNAME)$(LIBEXT)
%$(LIBEXT): %.cpp
$(CXX) $(CXXFLAGS) $(CPPFLAGS) $< -o $@
# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved.
from dataclasses import dataclass
from typing import Dict, List, Optional, Union
import numpy
from megatron.core.datasets.indexed_dataset import IndexedDataset
from megatron.core.datasets.masked_dataset import (
MaskedWordPieceDataset,
MaskedWordPieceDatasetConfig,
)
from megatron.core.datasets.utils import Split
@dataclass
class BERTMaskedWordPieceDatasetConfig(MaskedWordPieceDatasetConfig):
"""Configuration object for Megatron Core BERT WordPiece datasets"""
classification_head: bool = None
"""Option to perform the next sequence prediction during sampling"""
def __post_init__(self) -> None:
"""Do asserts and set fields post init
"""
super().__post_init__()
assert self.classification_head is not None
class BERTMaskedWordPieceDataset(MaskedWordPieceDataset):
"""The BERT dataset that assumes WordPiece tokenization
Args:
indexed_dataset (IndexedDataset): The IndexedDataset around which to build the MegatronDataset
dataset_path (str): The real path on disk to the dataset, for bookkeeping
indexed_indices (numpy.ndarray): The set of the documents indices to expose
num_samples (Optional[int]): The number of samples to draw from the indexed dataset. When None, build as many samples as correspond to one epoch.
index_split (Split): The indexed_indices Split
config (BERTMaskedWordPieceDatasetConfig): The config
"""
def __init__(
self,
indexed_dataset: IndexedDataset,
dataset_path: str,
indexed_indices: numpy.ndarray,
num_samples: Optional[int],
index_split: Split,
config: BERTMaskedWordPieceDatasetConfig,
) -> None:
super().__init__(
indexed_dataset, dataset_path, indexed_indices, num_samples, index_split, config
)
self.token_lookup = list(self.config.tokenizer.inv_vocab.keys())
# Account for the single <cls> and two <sep> token ids
self.sample_index = self._build_sample_index(
self.config.sequence_length - 3, 2 if self.config.classification_head else 1
)
@staticmethod
def _key_config_attributes() -> List[str]:
"""Inherited method implementation
Returns:
List[str]: The key config attributes
"""
return super(
BERTMaskedWordPieceDataset, BERTMaskedWordPieceDataset
)._key_config_attributes() + ["classification_head",]
def __getitem__(self, idx: int) -> Dict[str, Union[int, numpy.ndarray]]:
"""Abstract method implementation
Args:
idx (int): The index into the dataset
Returns:
Dict[str, Union[int, numpy.ndarray]]: The
"""
idx_beg, idx_end, target_sequence_length = self.sample_index[idx]
sample = [self.dataset[i] for i in range(idx_beg, idx_end)]
numpy_random_state = numpy.random.RandomState(
seed=(self.config.random_seed + idx) % 2 ** 32
)
assert target_sequence_length <= self.config.sequence_length
# Split the sample into contiguous subsegments A and B
pivot = len(sample)
is_next_random = False
if self.config.classification_head:
assert len(sample) > 1, "the sample must contain at least two sentences"
pivot = 1
if len(sample) >= 3:
pivot = numpy_random_state.randint(low=1, high=len(sample))
is_next_random = numpy_random_state.random() < 0.5
split_A = []
for sample_a in sample[:pivot]:
split_A.extend(sample_a)
split_B = []
for sample_b in sample[pivot:]:
split_B.extend(sample_b)
if is_next_random:
split_A, split_B = split_B, split_A
# Trim the subsegments from either end to a desired joint length
length_A = len(split_A)
length_B = len(split_B)
if length_A + length_B <= target_sequence_length:
truncated = False
else:
while length_A + length_B > target_sequence_length:
split = split_A if length_A > length_B else split_B
if numpy_random_state.random() < 0.5:
del split[0]
else:
del split[-1]
length_A = len(split_A)
length_B = len(split_B)
truncated = True
# Merge the subsegments and create the token assignment labels
tokens = [
self.config.tokenizer.cls,
*split_A,
self.config.tokenizer.sep,
]
assignments = [0 for _ in range(1 + len(split_A) + 1)]
if split_B:
tokens += [*split_B, self.config.tokenizer.sep]
assignments += [1 for _ in range(len(split_B) + 1)]
# Masking
tokens, masked_positions, masked_labels, _, _ = self._create_masked_lm_predictions(
tokens, target_sequence_length, numpy_random_state
)
# Pad the sequences and convert to NumPy
length_toks = len(tokens)
length_pads = self.config.sequence_length - length_toks
assert length_pads >= 0
tokens = numpy.array(tokens, dtype=numpy.int64)
tokens = numpy.pad(tokens, (0, length_pads), constant_values=self.config.tokenizer.pad)
assignments = numpy.array(assignments, dtype=numpy.int64)
assignments = numpy.pad(
assignments, (0, length_pads), constant_values=self.config.tokenizer.pad
)
# Get the padding mask
mask_pads = numpy.ones(length_toks, dtype=numpy.int64)
mask_pads = numpy.pad(
mask_pads, (0, length_pads), constant_values=self.config.tokenizer.pad
)
# Mask the labels
labels = numpy.zeros(self.config.sequence_length, dtype=numpy.int64) - 1
labels[masked_positions] = masked_labels
# Get the loss mask
mask_loss = numpy.zeros(self.config.sequence_length, dtype=numpy.int64)
mask_loss[masked_positions] = 1
return {
"text": tokens,
"types": assignments,
"labels": labels,
"is_random": int(is_next_random),
"padding_mask": mask_pads,
"loss_mask": mask_loss,
"truncated": int(truncated),
}
def _get_token_mask(self, numpy_random_state: numpy.random.RandomState) -> Optional[int]:
"""Abstract method implementation
80% of the time, replace the token id with mask token id. 10% of the time, replace token id
with a random token id from the vocabulary. 10% of the time, do nothing.
Args:
numpy_random_state (RandomState): The NumPy random state
Returns:
Optional[int]: The replacement token id or None
"""
if numpy_random_state.random() < 0.8:
return self.config.tokenizer.mask
else:
if numpy_random_state.random() >= 0.5:
return self.token_lookup[numpy_random_state.randint(0, len(self.token_lookup))]
return None
# Copyright (c) 2022, NVIDIA CORPORATION. All rights reserved.
import hashlib
import json
import logging
import os
import time
from collections import OrderedDict
from typing import Dict, List, Optional, Tuple, Union
import numpy
import torch
from megatron.core.datasets.blended_megatron_dataset_config import BlendedMegatronDatasetConfig
from megatron.core.datasets.megatron_dataset import MegatronDataset
from megatron.core.datasets.utils import normalize
from megatron.core.utils import log_single_rank
logger = logging.getLogger(__name__)
_VERBOSE = False
class BlendedDataset(torch.utils.data.Dataset):
"""Conjugating class for a set of MegatronDataset instances
Args:
datasets (List[MegatronDataset]): The MegatronDataset instances to blend
weights (List[Union[int, float]]): The weights that determine the dataset blend ratios
size (Optional[int]): The number of samples to draw from the blend. If None, for each dataset index idx draw exactly weights[idx] samples from datasets[idx].
config (BlendedMegatronDatasetConfig): The config
Raises:
RuntimeError: When the dataset has fewer or more samples than 'size' post-initialization
"""
def __init__(
self,
datasets: List[MegatronDataset],
weights: List[Union[int, float]],
size: Optional[int],
config: BlendedMegatronDatasetConfig,
) -> None:
assert len(datasets) == len(weights)
assert len(datasets) < 32767
assert all(map(lambda _: type(_) == type(datasets[0]), datasets))
assert all(map(lambda _: _.index_split == datasets[0].index_split, datasets))
assert all(map(lambda _: _ > 0, weights))
assert all(map(lambda _: type(_) == type(weights[0]), weights))
if size is None and isinstance(weights[0], float):
assert all(map(lambda _: _ == int(_), weights))
# Alert user to unnecessary blending
if len(datasets) == 1:
log_single_rank(
logger, logging.WARNING, f"Building a BlendedDataset for a single MegatronDataset"
)
if size is not None:
weights = normalize(weights)
self.datasets = datasets
self.split = self.datasets[0].index_split
self.weights = weights
self.size = size
self.config = config
unique_identifiers = OrderedDict()
unique_identifiers["class"] = type(self).__name__
unique_identifiers["datasets"] = [dataset.unique_identifiers for dataset in self.datasets]
unique_identifiers["split"] = self.split.name
unique_identifiers["weights"] = self.weights
unique_identifiers["size"] = self.size
self.unique_description = json.dumps(
unique_identifiers, indent=4, default=lambda obj: obj.unique_identifiers
)
self.unique_description_hash = hashlib.md5(
self.unique_description.encode("utf-8")
).hexdigest()
self.built_anew_on_cache_miss = False
self.dataset_index, self.dataset_sample_index = self._build_indices()
def __len__(self) -> int:
return self.dataset_index.shape[0]
def __getitem__(self, idx: int) -> Dict[str, Union[int, numpy.ndarray]]:
dataset_id = self.dataset_index[idx]
dataset_sample_id = self.dataset_sample_index[idx]
return {
"dataset_id": dataset_id,
**self.datasets[dataset_id][dataset_sample_id],
}
def _build_indices(self) -> Tuple[numpy.ndarray, numpy.ndarray]:
"""Build and optionally cache the dataset index and the dataset sample index
The dataset index is a 1-D mapping which determines the dataset to query. The dataset
sample index is a 1-D mapping which determines the sample to request from the queried
dataset.
Returns:
Tuple[numpy.ndarray, numpy.ndarray]: The dataset index and the dataset sample index
"""
path_to_cache = self.config.path_to_cache
if path_to_cache:
get_path_to = lambda suffix: os.path.join(
path_to_cache,
f"{self.unique_description_hash}-{type(self).__name__}-{self.split.name}-{suffix}",
)
path_to_description = get_path_to("description.txt")
path_to_dataset_index = get_path_to("dataset_index.npy")
path_to_dataset_sample_index = get_path_to("dataset_sample_index.npy")
cache_hit = all(
map(
os.path.isfile,
[path_to_description, path_to_dataset_index, path_to_dataset_sample_index],
)
)
else:
cache_hit = False
if not path_to_cache or (not cache_hit and torch.distributed.get_rank() == 0):
log_single_rank(
logger,
logging.INFO,
f"Build and save the {type(self).__name__} indices",
)
self.built_anew_on_cache_miss = True
# Build the dataset and dataset sample indexes
log_single_rank(
logger, logging.INFO, f"\tBuild and save the dataset and dataset sample indexes"
)
t_beg = time.time()
from megatron.core.datasets import helpers
if self.size is not None:
dataset_index = numpy.zeros(self.size, dtype=numpy.int16)
dataset_sample_index = numpy.zeros(self.size, dtype=numpy.int64)
helpers.build_blending_indices(
dataset_index,
dataset_sample_index,
self.weights,
len(self.datasets),
self.size,
_VERBOSE,
)
else:
size = sum(self.weights)
dataset_index = numpy.zeros(size, dtype=numpy.int16)
dataset_sample_index = numpy.zeros(size, dtype=numpy.int64)
helpers.build_exhaustive_blending_indices(
dataset_index, dataset_sample_index, self.weights, len(self.datasets)
)
if path_to_cache:
os.makedirs(path_to_cache, exist_ok=True)
# Write the description
with open(path_to_description, "wt") as writer:
writer.write(self.unique_description)
# Save the indexes
numpy.save(path_to_dataset_index, dataset_index, allow_pickle=True)
numpy.save(path_to_dataset_sample_index, dataset_sample_index, allow_pickle=True)
else:
log_single_rank(
logger,
logging.WARNING,
f"Unable to save the {type(self).__name__} indexes because path_to_cache is None",
)
t_end = time.time()
log_single_rank(logger, logging.DEBUG, f"\t> time elapsed: {t_end - t_beg:4f} seconds")
return dataset_index, dataset_sample_index
log_single_rank(logger, logging.INFO, f"Load the {type(self).__name__} indices")
log_single_rank(
logger, logging.INFO, f"\tLoad the dataset index from {path_to_dataset_index}"
)
t_beg = time.time()
dataset_index = numpy.load(path_to_dataset_index, allow_pickle=True, mmap_mode='r')
t_end = time.time()
log_single_rank(logger, logging.DEBUG, f"\t> time elapsed: {t_end - t_beg:4f} seconds")
log_single_rank(
logger,
logging.INFO,
f"\tLoad the dataset sample index from {path_to_dataset_sample_index}",
)
t_beg = time.time()
dataset_sample_index = numpy.load(
path_to_dataset_sample_index, allow_pickle=True, mmap_mode='r'
)
t_end = time.time()
log_single_rank(logger, logging.DEBUG, f"\t> time elapsed: {t_end - t_beg:4f} seconds")
return dataset_index, dataset_sample_index
# Copyright (c) 2022, NVIDIA CORPORATION. All rights reserved.
import logging
import math
from concurrent.futures import ThreadPoolExecutor
from typing import Any, Callable, Iterable, List, Optional, Type, Union
import numpy
import torch
from megatron.core.datasets.blended_dataset import BlendedDataset
from megatron.core.datasets.blended_megatron_dataset_config import BlendedMegatronDatasetConfig
from megatron.core.datasets.megatron_dataset import LowLevelDataset, MegatronDataset
from megatron.core.datasets.utils import Split, normalize
from megatron.core.parallel_state import get_virtual_pipeline_model_parallel_rank
from megatron.core.utils import log_single_rank
logger = logging.getLogger(__name__)
MidLevelDataset = MegatronDataset
TopLevelDataset = Union[BlendedDataset, MidLevelDataset]
DistributedDataset = Union[
TopLevelDataset, MidLevelDataset, LowLevelDataset, torch.utils.data.Dataset
]
class BlendedMegatronDatasetBuilder(object):
"""Builder class for the BlendedDataset and MegatronDataset classes
Args:
cls (Type[MegatronDataset]): The class to instantiate, must inherit from MegatronDataset
sizes (List[Optional[int]]): The minimum total number of samples to draw, or None, per split
is_built_on_rank (Callable): A callable which returns True if the dataset should be built on the current rank and False otherwise. It should be Megatron Core parallelism aware i.e. global rank, local group rank, and virtual rank may inform its return value.
config (BlendedMegatronDatasetConfig): The config object which informs dataset creation
"""
def __init__(
self,
cls: Type[MidLevelDataset],
sizes: List[int],
is_built_on_rank: Callable,
config: BlendedMegatronDatasetConfig,
):
self.cls = cls
self.sizes = sizes
self.is_built_on_rank = is_built_on_rank
self.config = config
log_single_rank(
logger,
logging.INFO,
f"Building dataset splits with cls={cls.__name__}, sizes={self.sizes}, and config={self.config}",
)
if not self.config.mock:
for split in Split:
size_is_none = self.sizes[split.value] is None
if self.config.blend_per_split is None:
weights_are_none = self.config.blend[1] is None
else:
if self.config.blend_per_split[split.value] is None:
continue
weights_are_none = self.config.blend_per_split[split.value][1] is None
if size_is_none:
assert (
weights_are_none
), f"size_is_none => weights_are_none fails for {split.name} split"
if torch.distributed.is_initialized():
gb_rank = torch.distributed.get_rank()
vp_rank = get_virtual_pipeline_model_parallel_rank()
if gb_rank == 0 and (vp_rank == 0 or vp_rank is None):
assert (
self.is_built_on_rank()
), "is_built_on_rank must return True when global rank = 0 and vp rank = 0"
def build(self) -> List[Optional[TopLevelDataset]]:
"""Build all dataset splits according to the provided blend(s)
This method is distributed-aware and must be called on all ranks.
The dataset splits returned can vary according to the config. Supply config.blend and
config.split to build BlendedDataset and/or MegatronDataset splits from the same
distribution. Supply config.blend_per_split to build BlendedDataset and/or MegatronDataset
splits from separate distributions. In either case, for each split, handle the following
cases:
(1) The split is None
- do nothing
(2) The split has one contributing dataset, and...
(a) 'size' is not None
- Build a mid-level dataset with low-level dataset sampling in proportion to the size
(b) 'size' is None
- Build mid-level datasets with no excess low-level dataset sampling
(3) The split has multiple contributing datasets, and...
(a) 'weights' is not None and 'size' is not None
- Build mid-level datasets with low-level dataset sampling in proportion to their weights and the size
- Build a top-level dataset of length marginally greater than 'size' with mid-level dataset sampling in proportion to their weights and the size
(b) 'weights' is not None and 'size' is None
- Error
(c) 'weights' is None and 'size' is not None
- Build mid-level datasets with no excess low-level dataset sampling
- Build a top-level dataset of length 'size' with mid-level dataset sampling in proportion to their lengths and the size
- The 'size' of the top-level dataset is capped at the sum of the mid-level dataset lengths
(d) 'weights' is None and 'size' is None
- Build mid-level datasets with no excess low-level dataset sampling
- Build a top-level dataset with no excess mid-level dataset sampling
Returns:
List[Optional[TopLevelDataset]]: A list containing a dataset instance (or None) per split
"""
datasets = self._build_blended_dataset_splits()
for dataset in datasets:
if dataset is not None and len(dataset) > 0:
if isinstance(dataset, BlendedDataset):
if dataset.built_anew_on_cache_miss or any(
x.built_anew_on_cache_miss for x in dataset.datasets
):
log_single_rank(
logger,
logging.INFO,
f"Verifying NumPy indices for {type(dataset).__name__} {dataset.split.name} split",
)
else:
log_single_rank(
logger,
logging.INFO,
f"NumPy indices for {type(dataset).__name__} {dataset.split.name} split are fully cached, skipping verification",
)
continue
# Check blend size
assert dataset.size is None or dataset.size == dataset.dataset_index.shape[0]
# Check blend access of mid-level datasets
_, sizes = numpy.unique(dataset.dataset_index, return_counts=True)
for i, dataset_and_size in enumerate(zip(dataset.datasets, sizes)):
if len(dataset_and_size[0]) < dataset_and_size[1]:
raise IndexError(
f"{type(dataset).__name__} blend goes out of bounds for {type([dataset_and_size[0]]).__name__} {i} for {dataset.split.name} split"
)
return datasets
def _build_blended_dataset_splits(
self,
) -> List[Optional[TopLevelDataset]]:
"""Build all dataset splits according to the provided blend(s)
See the BlendedMegatronDatasetBuilder.build alias for more information.
Returns:
List[Optional[TopLevelDataset]]: A list containing a dataset instance (or None) per split
"""
##
# Return fake "mock" datasets
##
if self.config.mock:
split = self.config.split_matrix
try:
return self._build_megatron_dataset_splits(None, split, self.sizes)
except Exception as error:
raise Exception(
f"{self.cls.__name__} failed to build as a mock data generator"
) from error
##
# All splits come from the same distribution
##
elif self.config.blend:
prefixes, weights = self.config.blend
if weights is not None:
weights = normalize(weights)
split = self.config.split_matrix
# Blend consists of a single prefix
if len(prefixes) == 1 and weights is None:
return self._build_megatron_dataset_splits(prefixes[0], split, self.sizes)
# Build the mid-level datasets
if weights is None:
sizes_per_dataset = [[None for split in Split] for prefix in prefixes]
else:
sizes_per_dataset = _get_size_per_split_per_dataset(weights, self.sizes)
# build each dataset in parallel
megatron_datasets = self._build_megatron_datasets_parallel(
prefixes, split, sizes_per_dataset
)
# Build the top-level datasets
blended_datasets = [None] * len(Split)
for i in range(len(Split)):
if split[i] is not None:
weights_i = weights
if weights_i is not None and self.sizes[i] is not None:
size_i = sum(list(zip(*sizes_per_dataset))[i])
elif weights_i is None:
try:
weights_i = [
len(megatron_dataset) for megatron_dataset in megatron_datasets[i]
]
except TypeError:
weights_i = [0 for _ in prefixes]
if self.sizes[i] is not None:
size_i = min(self.sizes[i], sum(weights_i))
else:
size_i = None # => the size will be sum(weights_i)
else:
raise RuntimeError
blended_datasets[i] = self.build_generic_dataset(
BlendedDataset,
self.is_built_on_rank,
True, # synchronize_ranks, default behavior to build on rank-0 first
megatron_datasets[i],
weights_i,
size_i,
self.config,
)
return blended_datasets
##
# Each split comes from a separate distribution
##
else:
blended_datasets = [None] * len(Split)
for i in range(len(Split)):
split_spoof = [None] * len(Split)
split_spoof[i] = (0.0, 1.0)
sizes_spoof = [0] * len(Split)
sizes_spoof[i] = self.sizes[i]
# Blend is provided for the split
blend = self.config.blend_per_split[i]
if blend is not None:
prefixes, weights = blend
if weights is not None:
weights = normalize(weights)
# Blend consists of a sigle prefix
if len(prefixes) == 1:
blended_datasets[i] = self._build_megatron_dataset_splits(
prefixes[0], split_spoof, sizes_spoof
)[i]
continue
# Build mid-level datasets
if weights is None:
sizes_per_dataset = [[None for split in Split] for prefix in prefixes]
else:
sizes_per_dataset = _get_size_per_split_per_dataset(weights, sizes_spoof)
# build each dataset in parallel
megatron_datasets = self._build_megatron_datasets_parallel(
prefixes, split_spoof, sizes_per_dataset
)[i]
# Build top-level dataset
if weights is not None and self.sizes[i] is not None:
size = list(map(sum, zip(*sizes_per_dataset)))[i]
elif weights is None:
try:
weights = [
len(megatron_dataset) for megatron_dataset in megatron_datasets
]
except TypeError:
weights = [0 for _ in prefixes]
if self.sizes[i] is not None:
size = min(self.sizes[i], sum(weights))
else:
size = None # => the size will be sum(weights)
else:
raise RuntimeError
blended_datasets[i] = self.build_generic_dataset(
BlendedDataset,
self.is_built_on_rank,
True, # synchronize_ranks, default behavior to build on rank-0 first
megatron_datasets,
weights,
size,
self.config,
)
return blended_datasets
def _build_megatron_datasets_parallel(
self,
prefixes: List[str],
split: List[float],
sizes_per_dataset: List[List[int]],
) -> List[List[Optional[MegatronDataset]]]:
"""Build the megatron datasets for a list of prefixes in parallel
Args:
prefixes (List[str]): The list of prefix strings
split (List[float]): The dataset split ratios (must sum to 1.00)
sizes_per_dataset (List[List[int]]): The number of samples to request
per MegatronDataset per spilt
Returns:
List[List[Optional[MegatronDataset]]]: For each split, have a list of
MegatronDataset per prefix
"""
# Helper function to wrap the threading logic
def _threading_helper(
megatron_datasets: List[List[Optional[MegatronDataset]]],
num_workers: int,
prefixes: List[str],
split: List[float],
sizes_per_dataset: List[List[int]],
) -> None:
with ThreadPoolExecutor(max_workers=num_workers) as executor:
all_futures = []
for i in range(len(prefixes)):
all_futures.append(
executor.submit(
self._build_megatron_dataset_splits,
prefixes[i],
split,
sizes_per_dataset[i],
False, # synchronize_ranks, barrier is called in this function
)
)
for future in all_futures:
try:
megatron_datasets_split = future.result()
for j in range(len(megatron_datasets_split)):
megatron_datasets[j].append(megatron_datasets_split[j])
except Exception as err:
raise err
megatron_datasets = [[] for _ in range(len(Split))]
num_dataset_builder_threads = self.config.num_dataset_builder_threads
if torch.distributed.is_initialized():
rank = torch.distributed.get_rank()
# First, build on rank 0
if rank == 0:
num_workers = num_dataset_builder_threads
if num_workers > 1:
# since only rank 0 is running, scale up the thread count
# but not too much to avoid overloading storage on miss path.
# if user set num_dataset_builder_threads to 1,
# i.e. meant for serial build, do not scale up.
num_workers *= min(2, max(1, torch.cuda.device_count()))
_threading_helper(
megatron_datasets,
num_workers,
prefixes,
split,
sizes_per_dataset,
)
torch.distributed.barrier()
# Then, build on other ranks; guaranteed to be data_cache hit
if rank != 0:
_threading_helper(
megatron_datasets,
num_dataset_builder_threads,
prefixes,
split,
sizes_per_dataset,
)
else:
_threading_helper(
megatron_datasets,
num_dataset_builder_threads,
prefixes,
split,
sizes_per_dataset,
)
return megatron_datasets
def _build_megatron_dataset_splits(
self,
dataset_path: Optional[str],
split: List[float],
sizes: List[int],
synchronize_ranks: bool = True,
) -> List[Optional[MidLevelDataset]]:
"""Build each MidLevelDataset split from a single LowLevelDataset
Args:
dataset_path (Optional[str]): The path on disk which defines the underlying LowLevelDataset, or None for mock dataset classes
split (List[Tuple[float, float]]): The dataset split matrix
sizes (List[int]): The number of total samples to draw from each split
synchronize_ranks (bool): Whether to call barrier for rank-0 / barrier / other-ranks behavior. Set to False when we enforce this behavior at higher level.
Returns:
List[Optional[MidLevelDataset]]: The MidLevelDataset (or None) per split
"""
# short-cut if we are not building on this rank
if torch.distributed.is_initialized() and not self.is_built_on_rank():
for i in range(len(Split)):
if split[i] is not None and synchronize_ranks:
torch.distributed.barrier()
return [None] * len(Split)
# Build the low level dataset
low_level_dataset = self.cls.build_low_level_dataset(dataset_path, self.config)
# Build the split indices for the low level dataset
num_elements = self.cls.numel_low_level_dataset(low_level_dataset)
split_indices = []
for i, _ in enumerate(Split):
if split[i] is not None:
beg = int(round(split[i][0] * float(num_elements)))
end = int(round(split[i][1] * float(num_elements)))
split_indices.append(numpy.arange(start=beg, stop=end, step=1, dtype=numpy.int32))
else:
split_indices.append(None)
# Build the mid level dataset
mid_level_datasets = []
for i, _split in enumerate(Split):
if split[i] is None:
mid_level_datasets.append(None)
else:
mid_level_datasets.append(
self.build_generic_dataset(
self.cls,
self.is_built_on_rank,
synchronize_ranks,
low_level_dataset,
dataset_path,
split_indices[i],
sizes[i],
_split,
self.config,
)
)
return mid_level_datasets
@staticmethod
def build_generic_dataset(
cls: Union[Type[DistributedDataset], Callable],
is_built_on_rank: Callable,
synchronize_ranks: bool,
*args: Any,
) -> Optional[Union[DistributedDataset, Iterable]]:
"""Build the DistributedDataset
Return None if and only if the underlying dataset class is not built on the current rank
and torch.distributed is initialized.
Args:
cls (Union[Type[DistributedDataset], Callable]): The DistributedDataset class to be built. In special cases, e.g. when we are building the low level dataset for a RawMegatronDataset instance, we can accept a Callable which returns an Iterable.
synchronize_ranks (bool): Whether to call barrier for rank-0 / barrier / other-ranks behavior. Set to False when we enforce this behavior at higher level.
args (Tuple[Any]): The positional arguments used to build the provided DistributedDataset class
Raises:
Exception: When the dataset constructor raises an OSError
Returns:
Optional[Union[DistributedDataset, Iterable]]: The DistributedDataset instantion, the Iterable instantiation, or None
"""
if torch.distributed.is_initialized():
rank = torch.distributed.get_rank()
dataset = None
# First, build on rank 0
if rank == 0 and is_built_on_rank():
try:
dataset = cls(*args)
except OSError as err:
log = (
f"Failed to write dataset materials to the data cache directory. "
+ f"Please supply a directory to which you have write access via "
+ f"the path_to_cache attribute in BlendedMegatronDatasetConfig and "
+ f"retry. Refer to the preserved traceback above for more information."
)
raise Exception(log) from err
if synchronize_ranks:
torch.distributed.barrier()
# After, build on other ranks
if rank != 0 and is_built_on_rank():
dataset = cls(*args)
return dataset
return cls(*args)
def _get_size_per_split_per_dataset(
normalized_weights: List[float], target_size_per_split: List[int]
) -> List[List[int]]:
"""Determine the contribution of the MegatronDataset splits to the BlendedDataset splits
Args:
normalized_weights (List[float]): e.g. [0.3, 0.7]
target_size_per_split (List[int]): The number of samples to target for each BlendedDataset split
Returns:
List[List[int]]: The number of samples to request per MegatronDataset per split
"""
assert numpy.isclose(sum(normalized_weights), 1.0)
# Use 0.5% target margin to ensure we satiate the request
sizes_per_dataset = [
[int(math.ceil(target_size * weight * 1.005)) for target_size in target_size_per_split]
for weight in normalized_weights
]
return sizes_per_dataset
# Copyright (c) 2023, NVIDIA CORPORATION. All rights reserved.
import functools
import logging
import re
from dataclasses import dataclass, field
from typing import List, Optional, Tuple
from megatron.core.datasets.megatron_tokenizer import MegatronTokenizer
from megatron.core.datasets.utils import Split, log_single_rank, normalize
logger = logging.getLogger(__name__)
@dataclass
class BlendedMegatronDatasetConfig:
"""Configuration object for Megatron Core datasets"""
random_seed: int
"""The seed for all RNG during dataset creation."""
sequence_length: int
"""The sequence length."""
blend: Optional[Tuple[List[str], Optional[List[float]]]] = None
"""The blend, consisting of a list of dataset prefixes and optionally a list of dataset
weights. For example, [["dataset-path1", "dataset-path2"], [0.3, 0.7]]. When the weights are
None, they are inferred from the lengths of the contributing datasets. Not to be used with
'blend_per_split'. Defaults to None.
"""
blend_per_split: Optional[List[Optional[Tuple[List[str], Optional[List[float]]]]]] = None
"""A set of blends, as defined above, one for each split distribution. Not to be used with
'blend'. Defauls to None.
"""
split: Optional[str] = None
"""The split string, a comma separated weighting for the dataset splits when drawing samples
from a single distribution. Not to be used with 'blend_per_split'. Defaults to None.
"""
split_matrix: Optional[List[Tuple[float, float]]] = field(init=False, default=None)
"""The split matrix consisting of non-overlapping book-ends of each split in order. For more
information, refer to 'convert_split_vector_to_split_matrix'. Created automatically from
'split'. Not to be passed in to the constructor.
"""
num_dataset_builder_threads: int = 1
"""The number of threads to use for dataset building."""
path_to_cache: Optional[str] = None
"""Where all re-useable dataset indices are to be cached."""
mmap_bin_files: bool = True
"""Whether to mmap the .bin files or use file pointers."""
mock: bool = field(init=False, default=False)
"""Whether to bypass real data loading and validation in favor of mock data generation.
Created automatically from 'blend' and 'blend_per_split'. Not to be passed in to the
constructor.
"""
tokenizer: Optional[MegatronTokenizer] = None
"""The MegatronTokenizer instance or None. Required for datasets which do online tokenization."""
def __post_init__(self) -> None:
"""Do asserts and set fields post init
"""
if self.blend_per_split is not None and any(self.blend_per_split):
assert self.blend is None, "blend and blend_per_split are incompatible"
assert self.split is None, "split and blend_per_split are incompatible"
assert len(self.blend_per_split) == len(
Split
), f"blend_per_split must contain {len(Split)} blends"
for split in Split:
if self.blend_per_split[split.value] is None:
log_single_rank(
logger, logging.INFO, f"blend not provided for {split.name} split"
)
else:
assert self.blend_per_split[split.value][1] is None or len(
self.blend_per_split[split.value][0]
) == len(
self.blend_per_split[split.value][1]
), "blend per split prefixes and weights must be equal in number"
else:
if self.blend is not None:
assert self.blend[1] is None or len(self.blend[0]) == len(
self.blend[1]
), "blend prefixes and weights must be equal in number"
assert self.split is not None, "split must be provided when blend is not None"
else:
self.mock = True
log_single_rank(
logger,
logging.INFO,
f"Let mock = True, as both blend and blend_per_split are None",
)
self.split = "1,1,1"
log_single_rank(
logger,
logging.INFO,
f"Let split = {self.split}, an arbitrarily even split, as mock is True",
)
split_vector = parse_and_normalize_split(self.split)
self.split_matrix = convert_split_vector_to_split_matrix(split_vector)
log_single_rank(logger, logging.INFO, f"Let split_matrix = {self.split_matrix}")
def parse_and_normalize_split(split: str) -> List[float]:
"""Parse the dataset split ratios from a string
Args:
split (str): The train valid test split string e.g. "99,1,0"
Returns:
List[float]: The trian valid test split ratios e.g. [0.99, 0.01, 0.0]
"""
split = list(map(float, re.findall(r"[.0-9]+", split)))
split = split + [0.0 for _ in range(len(Split) - len(split))]
assert len(split) == len(Split)
assert all(map(lambda _: _ >= 0.0, split))
split = normalize(split)
return split
def convert_split_vector_to_split_matrix(
vector_a: List[float], vector_b: Optional[List[float]] = None
) -> List[Optional[Tuple[float, float]]]:
"""Build the split matrix from one or optionally two contributing split vectors.
Ex. a standard conversion:
[0.99, 0.01, 0.0] -> [(0, 0.99), (0.99, 1.0), None]
Ex. a conversion for Retro when Retro pretraining uses a [0.99, 0.01, 0.0] split and Retro
preprocessing used a [0.98, 0.02, 0.0] split:
[0.99, 0.01, 0.0], [0.98, 0.02, 0.0] -> [(0, 0.98), (0.99, 1.0), None]
Args:
vector_a (List[float]): The primary split vector
vector_b (Optional[List[float]]): An optional secondary split vector which constrains the primary split vector. Defaults to None.
Returns:
List[Tuple[float, float]]: The split matrix consisting of book-ends of each split in order
"""
if vector_b is None:
vector_b = vector_a
# [.900, .090, .010] -> [0.00, .900, .990, 100]
expansion_a = functools.reduce(lambda a, b: a + [a[len(a) - 1] + b], [[0], *vector_a])
expansion_b = functools.reduce(lambda a, b: a + [a[len(a) - 1] + b], [[0], *vector_b])
# [0.00, .900, .990, 100.0] -> [(0.00, .900), (.900, .990), (.990, 100)]
bookends_a = list(zip(expansion_a[:-1], expansion_a[1:]))
bookends_b = list(zip(expansion_b[:-1], expansion_b[1:]))
# gather per-split overlap or None
matrix = []
for bookend_a, bookend_b in zip(bookends_a, bookends_b):
if min(bookend_a[1], bookend_b[1]) <= max(bookend_a[0], bookend_b[0]):
overlap = None
else:
overlap = (max(bookend_a[0], bookend_b[0]), min(bookend_a[1], bookend_b[1]))
matrix.append(overlap)
return matrix
# Copyright (c) 2023, NVIDIA CORPORATION. All rights reserved.
import logging
import os
import time
from dataclasses import dataclass
from typing import Dict, Optional, Tuple
import numpy
import torch
from megatron.core.datasets.blended_megatron_dataset_config import BlendedMegatronDatasetConfig
from megatron.core.datasets.indexed_dataset import IndexedDataset
from megatron.core.datasets.megatron_dataset import MegatronDataset
from megatron.core.datasets.megatron_tokenizer import MegatronTokenizer
from megatron.core.datasets.utils import Split
from megatron.core.datasets.utils_s3 import S3Config, is_s3_path
from megatron.core.utils import log_single_rank
logger = logging.getLogger(__name__)
_PAD_TOKEN_ID = -1
@dataclass
class GPTDatasetConfig(BlendedMegatronDatasetConfig):
"""Configuration object for Megatron Core GPT datasets"""
reset_position_ids: bool = None
"""Option to reset the position IDs in the dataset at an interval"""
reset_attention_mask: bool = None
"""Option to reset the attention mask from the dataset"""
eod_mask_loss: bool = None
"""Option to enable the EOD mask loss"""
create_attention_mask: bool = True
"""Option to enable the attention masks generation. Can be disabled if attention kernel
generates masks by itself.
"""
drop_last_partial_validation_sequence: bool = True
"""Option to drop the last partial validation sequence"""
add_extra_token_to_sequence: bool = True
"""Option to draw sequences with one extra token to ensure the sample input tokens and sample
output tokens are both of the desired sequence length
"""
s3_cache_path: str = None
"""Path for caching indices for s3 dataloading."""
def __post_init__(self) -> None:
"""Do asserts and set fields post init"""
super().__post_init__()
assert self.tokenizer is not None
assert self.reset_position_ids is not None
assert self.reset_attention_mask is not None
assert self.eod_mask_loss is not None
class GPTDataset(MegatronDataset):
"""The base GPT dataset
Args:
indexed_dataset (IndexedDataset): The IndexedDataset around which to build the GPTDataset
dataset_path (Optional[str]): The real path on disk to the dataset, for bookkeeping
indexed_indices (numpy.ndarray): The set of the documents indices to expose
num_samples (Optional[int]): The number of samples to draw from the indexed dataset. When None, build as many samples as correspond to one epoch.
index_split (Split): The indexed_indices Split
config (GPTDatasetConfig): The config
"""
def __init__(
self,
indexed_dataset: IndexedDataset,
dataset_path: Optional[str],
indexed_indices: numpy.ndarray,
num_samples: Optional[int],
index_split: Split,
config: GPTDatasetConfig,
) -> None:
super().__init__(
indexed_dataset, dataset_path, indexed_indices, num_samples, index_split, config
)
self.masks_and_position_ids_are_cacheable = not any(
[
self.config.reset_position_ids,
self.config.reset_attention_mask,
self.config.eod_mask_loss,
]
)
self.masks_and_position_ids_are_cached = False
self.cached_attention_mask = None
self.cached_loss_mask = None
self.cached_position_ids = None
try:
self._pad_token_id = self.config.tokenizer.pad
except:
self._pad_token_id = _PAD_TOKEN_ID
(
self.document_index,
self.sample_index,
self.shuffle_index,
) = self._build_document_sample_shuffle_indices()
@staticmethod
def numel_low_level_dataset(low_level_dataset: IndexedDataset) -> int:
"""Abstract method implementation
For GPT, the underlying IndexedDataset should be split by sequence, as opposed to, say,
BERT, which should be split by document
Args:
low_level_dataset (IndexedDataset): The underlying IndexedDataset
Returns:
int: The number of unique elements in the underlying IndexedDataset
"""
return low_level_dataset.sequence_lengths.shape[0]
@staticmethod
def build_low_level_dataset(dataset_path: str, config: GPTDatasetConfig) -> IndexedDataset:
"""Abstract method implementation
Args:
dataset_path (str): The real path prefix to the IndexedDataset .bin and .idx files
config (GPTDatasetConfig): The config
Returns:
IndexedDataset: The underlying IndexedDataset
"""
if is_s3_path(dataset_path):
return IndexedDataset(
dataset_path,
multimodal=False,
mmap=config.mmap_bin_files,
s3_config=S3Config(path_to_idx_cache=config.s3_cache_path),
)
return IndexedDataset(dataset_path, multimodal=False, mmap=config.mmap_bin_files)
def __len__(self) -> int:
"""Abstract method implementation
Returns:
int: The length of the dataset
"""
return self.sample_index.shape[0] - 1
def __getitem__(self, idx: Optional[int]) -> Dict[str, torch.Tensor]:
"""Abstract method implementation
Args:
idx (Optioal[int]): The index into the dataset
Returns:
Dict[str, torch.Tensor]: The sample information wrapped in a dictionary
"""
if idx is None:
# Batch padding sequence so the index does not matter
text, _ = self._query_document_sample_shuffle_indices(0)
else:
text, _ = self._query_document_sample_shuffle_indices(idx)
text = torch.from_numpy(text).long()
if self.config.add_extra_token_to_sequence:
tokens = text[:-1].contiguous()
labels = text[1:].contiguous()
else:
tokens = text
labels = torch.roll(text, shifts=-1, dims=0)
labels[-1] = self._pad_token_id
if (
not self.masks_and_position_ids_are_cacheable
or not self.masks_and_position_ids_are_cached
):
attention_mask, loss_mask, position_ids = _get_ltor_masks_and_position_ids(
tokens,
self.config.tokenizer.eod,
self.config.reset_position_ids,
self.config.reset_attention_mask,
self.config.eod_mask_loss,
self.config.create_attention_mask,
)
if self.masks_and_position_ids_are_cacheable:
self.cached_attention_mask = attention_mask
self.cached_loss_mask = loss_mask
self.cached_position_ids = position_ids
self.masks_and_position_ids_are_cached = True
else:
attention_mask = self.cached_attention_mask
loss_mask = self.cached_loss_mask
position_ids = self.cached_position_ids
# For padded sequences, mask the loss
loss_mask[labels == self._pad_token_id] = 0.0
# For padded sequences, ensure the embedding layer can map the token ID
tokens[tokens == self._pad_token_id] = 0
labels[labels == self._pad_token_id] = 0
# Batch padding sequence so we mask the loss
if idx is None:
loss_mask = torch.zeros_like(loss_mask)
if self.config.create_attention_mask:
return {
"tokens": tokens,
"labels": labels,
"attention_mask": attention_mask,
"loss_mask": loss_mask,
"position_ids": position_ids,
}
else:
return {
"tokens": tokens,
"labels": labels,
"loss_mask": loss_mask,
"position_ids": position_ids,
}
def _query_document_sample_shuffle_indices(
self, idx: int
) -> Tuple[numpy.ndarray, numpy.ndarray]:
"""Get the text (token ids) and document ids for a given index
Args:
idx (int): The index into the dataset
Returns:
Tuple[numpy.ndarray, numpy.ndarray]: The text ids and document ids
"""
# Do the shuffle mapping
idx = self.shuffle_index[idx]
# Get the beginning and end documents and offsets
doc_index_beg, doc_index_beg_offset = self.sample_index[idx]
doc_index_end, doc_index_end_offset = self.sample_index[idx + 1]
document_ids = []
sample_parts = []
# Sample spans a single document
if doc_index_beg == doc_index_end:
# Add the document id
document_ids.append(self.document_index[doc_index_beg])
# Add the entire sample
sample_parts.append(
self.dataset.get(
self.document_index[doc_index_beg],
offset=doc_index_beg_offset,
length=doc_index_end_offset
- doc_index_beg_offset
+ self.config.add_extra_token_to_sequence,
)
)
# Sample spans multiple documents
else:
for i in range(doc_index_beg, doc_index_end + 1):
# Add the document id
document_ids.append(self.document_index[i])
# Add the sample part
offset = 0 if i > doc_index_beg else doc_index_beg_offset
length = (
None
if i < doc_index_end
else doc_index_end_offset + self.config.add_extra_token_to_sequence
)
sample_parts.append(
self.dataset.get(self.document_index[i], offset=offset, length=length)
)
assert len(document_ids) == len(
sample_parts
), f"len(document_ids) ({len(document_ids)}) != len(sample_parts) ({len(sample_parts)})"
length = sum(map(len, sample_parts))
# Pad the sample if necessary
if length < (self.config.sequence_length + self.config.add_extra_token_to_sequence):
sample_parts.append(
[self._pad_token_id]
* (self.config.sequence_length + self.config.add_extra_token_to_sequence - length)
)
return (
numpy.concatenate(sample_parts, dtype=numpy.int64),
numpy.array(document_ids, dtype=numpy.int64),
)
def _build_document_sample_shuffle_indices(
self,
) -> Tuple[numpy.ndarray, numpy.ndarray, numpy.ndarray]:
"""Build the document index, the sample index, and the shuffle index
The document index:
-- 1-D
-- An ordered array of document ids
The sample index:
-- 2-D
-- The document indices and offsets which mark the start of every sample
The shuffle index:
-- 1-D
-- A random permutation of index range of the sample index
Returns:
Tuple[numpy.ndarray, numpy.ndarray]: The document index, the sample index, and the shuffle index
"""
path_to_cache = self.config.path_to_cache
if path_to_cache is None and not self.config.mock:
path_to_cache = os.path.join(
self.dataset.path_prefix, "cache", f"{type(self).__name__}_indices"
)
if path_to_cache:
get_path_to = lambda suffix: os.path.join(
path_to_cache,
f"{self.unique_description_hash}-{type(self).__name__}-{self.index_split.name}-{suffix}",
)
path_to_description = get_path_to("description.txt")
path_to_document_index = get_path_to("document_index.npy")
path_to_sample_index = get_path_to("sample_index.npy")
path_to_shuffle_index = get_path_to("shuffle_index.npy")
cache_hit = all(
map(
os.path.isfile,
[
path_to_description,
path_to_document_index,
path_to_sample_index,
path_to_shuffle_index,
],
)
)
else:
cache_hit = False
if not path_to_cache or (
not cache_hit
and (not torch.distributed.is_initialized() or torch.distributed.get_rank() == 0)
):
log_single_rank(
logger,
logging.INFO,
f"Build and save the {type(self).__name__} {self.index_split.name} indices",
)
self.built_anew_on_cache_miss = True
t_beg = time.time()
sequence_length = self.config.sequence_length
num_tokens_per_epoch = self._get_num_tokens_per_epoch()
num_epochs = self._get_num_epochs(num_tokens_per_epoch)
if num_epochs == 1:
separate_final_epoch = False
else:
# Get the number of samples for the last epoch
num_samples_sans_final_epoch = (
(num_epochs - 1) * num_tokens_per_epoch
- self.config.add_extra_token_to_sequence
) // sequence_length
num_samples_from_final_epoch = self.num_samples - num_samples_sans_final_epoch
num_samples_per_epoch = (
num_tokens_per_epoch - self.config.add_extra_token_to_sequence
) // sequence_length
# num_samples_from_final_epoch should be non-negative
assert num_samples_from_final_epoch >= 0
# num_samples_from_final_epoch should not exceed max value
assert num_samples_from_final_epoch <= num_samples_per_epoch + 1
# Separate the final epoch if it falls below the threshold
threshold = 0.80
separate_final_epoch = num_samples_from_final_epoch < int(
threshold * num_samples_per_epoch
)
log_single_rank(
logger,
logging.DEBUG,
f"> num_samples_from_final_epoch: {num_samples_from_final_epoch}",
)
log_single_rank(logger, logging.DEBUG, f"> threshold: {threshold}")
log_single_rank(
logger, logging.DEBUG, f"> num_samples_per_epoch: {num_samples_per_epoch}"
)
log_single_rank(
logger, logging.DEBUG, f"> separate_final_epoch: {separate_final_epoch}"
)
numpy_random_state = numpy.random.RandomState(self.config.random_seed)
# Build the document index
document_index = _build_document_index(
self.indices, num_epochs, numpy_random_state, separate_final_epoch
)
drop_last_partial_sequence = True
if self.index_split == Split.valid:
drop_last_partial_sequence = self.config.drop_last_partial_validation_sequence
# Build the sample index
from megatron.core.datasets import helpers
if self.index_split == Split.valid:
drop_last_partial_sequence = self.config.drop_last_partial_validation_sequence
else:
drop_last_partial_sequence = True
assert document_index.dtype == numpy.int32
assert self.dataset.sequence_lengths.dtype == numpy.int32
if len(document_index) * 2 > len(self.dataset.sequence_lengths):
# Heuristic: if "access density" of sequence_lengths is relatively high,
# force loading the mmap-ed array into memory by taking a copy.
# System performance benefits come from two aspects:
# 1. **sequentially** pre-loading the whole file if we're gonna read a large fraction anyways.
# 2. GIL is held when calling into c++ code; making the c++ func faster improves parallelism.
sequence_lengths_for_cpp = self.dataset.sequence_lengths.copy()
else:
sequence_lengths_for_cpp = self.dataset.sequence_lengths
sample_index = helpers.build_sample_idx(
sequence_lengths_for_cpp,
document_index,
sequence_length,
num_epochs,
num_tokens_per_epoch,
drop_last_partial_sequence,
self.config.add_extra_token_to_sequence,
)
# Build the shuffle index
if separate_final_epoch:
shuffle_index = _build_shuffle_index(
num_samples_sans_final_epoch, sample_index.shape[0] - 1, numpy_random_state
)
else:
shuffle_index = _build_shuffle_index(
sample_index.shape[0] - 1, sample_index.shape[0] - 1, numpy_random_state
)
if path_to_cache:
os.makedirs(path_to_cache, exist_ok=True)
# Write the description
with open(path_to_description, "wt") as writer:
writer.write(self.unique_description)
numpy.save(path_to_document_index, document_index, allow_pickle=True)
numpy.save(path_to_sample_index, sample_index, allow_pickle=True)
numpy.save(path_to_shuffle_index, shuffle_index, allow_pickle=True)
else:
log_single_rank(
logger,
logging.WARNING,
f"Unable to save the {type(self).__name__} indexes because path_to_cache is None",
)
t_end = time.time()
log_single_rank(logger, logging.DEBUG, f"\t> time elapsed: {t_end - t_beg:4f} seconds")
log_single_rank(
logger, logging.INFO, f"> total number of samples: {sample_index.shape[0] - 1}"
)
log_single_rank(logger, logging.INFO, f"> total number of epochs: {num_epochs}")
return document_index, sample_index, shuffle_index
log_single_rank(
logger, logging.INFO, f"Load the {type(self).__name__} {self.index_split.name} indices"
)
log_single_rank(
logger,
logging.INFO,
f"\tLoad the document index from {os.path.basename(path_to_document_index)}",
)
t_beg = time.time()
document_index = numpy.load(path_to_document_index, allow_pickle=True, mmap_mode='r')
t_end = time.time()
log_single_rank(logger, logging.DEBUG, f"\t> time elapsed: {t_end - t_beg:4f} seconds")
log_single_rank(
logger,
logging.INFO,
f"\tLoad the sample index from {os.path.basename(path_to_sample_index)}",
)
t_beg = time.time()
sample_index = numpy.load(path_to_sample_index, allow_pickle=True, mmap_mode='r')
t_end = time.time()
log_single_rank(logger, logging.DEBUG, f"\t> time elapsed: {t_end - t_beg:4f} seconds")
log_single_rank(
logger,
logging.INFO,
f"\tLoad the shuffle index from {os.path.basename(path_to_shuffle_index)}",
)
t_beg = time.time()
shuffle_index = numpy.load(path_to_shuffle_index, allow_pickle=True, mmap_mode='r')
t_end = time.time()
log_single_rank(logger, logging.DEBUG, f"\t> time elapsed: {t_end - t_beg:4f} seconds")
log_single_rank(
logger, logging.INFO, f"> total number of samples: {sample_index.shape[0] - 1}"
)
return document_index, sample_index, shuffle_index
def _get_num_tokens_per_epoch(self) -> int:
"""Calculate the number of tokens in a single epoch
Returns:
int: The number of tokens in a single epoch
"""
return int(numpy.sum(self.dataset.sequence_lengths[self.indices]))
def _get_num_epochs(self, num_tokens_per_epoch: int) -> int:
"""Calculate the number of epochs
Args:
num_tokens_per_epoch (int): The number of tokens in a single epoch
Returns:
int: The number of epochs
"""
num_epochs = 1
num_tokens = num_tokens_per_epoch
if self.num_samples is None:
return num_epochs
else:
num_tokens_requested = (
self.num_samples * self.config.sequence_length
) + self.config.add_extra_token_to_sequence
while num_tokens < num_tokens_requested:
num_epochs += 1
num_tokens += num_tokens_per_epoch
return num_epochs
def _build_document_index(
documents: numpy.ndarray,
num_epochs: int,
numpy_random_state: numpy.random.RandomState,
separate_final_epoch: bool,
) -> numpy.ndarray:
"""Build an array with length = num epochs * num documents
Args:
documents (numpy.ndarray): the subset of exposed document indices
num_epochs (int): The number of epochs
numpy_random_state (numpy.random.RandomState): The NumPy random state
separate_final_epoch (bool): Whether to exclude the last epoch from the global shuffle
Returns:
numpy.ndarray: The document index
"""
if not separate_final_epoch or num_epochs == 1:
document_index = numpy.mgrid[0:num_epochs, 0 : len(documents)][1]
document_index[:] = documents
document_index = document_index.reshape(-1)
document_index = document_index.astype(numpy.int32)
numpy_random_state.shuffle(document_index)
return document_index
doc_idx_first = _build_document_index(documents, num_epochs - 1, numpy_random_state, False)
doc_idx_last = _build_document_index(documents, 1, numpy_random_state, False)
return numpy.concatenate((doc_idx_first, doc_idx_last))
def _build_shuffle_index(
num_samples: int, total_size: int, numpy_random_state: numpy.random.RandomState
) -> numpy.ndarray:
"""Build the range [0, size) and shuffle
Args:
num_samples (int): The size of the first shuffle range [0, num_samples)
total_size (int): The size of the entire index. If larger than 'num_samples', it defines the second shuffle range [num_samples, total_size)
numpy_random_state (numpy.random.RandomState): The NumPy random state
Returns:
numpy.ndarray: The shuffle index
"""
dtype_ = numpy.uint32
if total_size >= (numpy.iinfo(numpy.uint32).max - 1):
dtype_ = numpy.int64
shuffle_idx_first = numpy.arange(start=0, stop=num_samples, step=1, dtype=dtype_)
numpy_random_state.shuffle(shuffle_idx_first)
if num_samples == total_size:
return shuffle_idx_first
shuffle_idx_last = numpy.arange(start=num_samples, stop=total_size, step=1, dtype=dtype_)
numpy_random_state.shuffle(shuffle_idx_last)
return numpy.concatenate((shuffle_idx_first, shuffle_idx_last))
def _get_ltor_masks_and_position_ids(
data: torch.Tensor,
eod_token: int,
reset_position_ids: bool,
reset_attention_mask: bool,
eod_mask_loss: bool,
create_attention_mask: bool,
):
"""Build masks and position id for left to right model.
Args:
data (torch.Tensor): The data tenor that holds the tokens from the dataset
eod_token (int): ID of the token to that is considered the EOD
reset_position_ids (bool): Switch to reset the document position ID's
reset_attention_mask (bool): Switch to reset the attention mask
eod_mask_loss (bool): Switch to enable the EOD mask loss
create_attention_mask (bool): Switch to enable the attention masks generation. Can be disabled if attention kernel generates masks by itself.
Returns:
torch.Tensor: Attention mask needed to be used for Attention
torch.Tensor: The mask used for loss value during training
torch.Tensor: The position ID's of the token
"""
seq_length = data.numel()
if create_attention_mask:
attention_mask = torch.tril(
torch.ones((seq_length, seq_length), device=data.device)
).unsqueeze(0)
else:
attention_mask = None
# Loss mask.
loss_mask = torch.ones(seq_length, dtype=torch.float, device=data.device)
if eod_mask_loss:
loss_mask[data == eod_token] = 0.0
# Position ids.
position_ids = torch.arange(seq_length, dtype=torch.long, device=data.device)
# We need to clone as the ids will be modifed based on batch index.
if reset_position_ids:
position_ids = position_ids.clone()
if reset_position_ids or reset_attention_mask:
# Find indices where EOD token is.
eod_index = position_ids[data == eod_token]
# Detach indices from positions if going to modify positions.
if reset_position_ids:
eod_index = eod_index.clone()
# Loop through EOD indices:
prev_index = 0
for j in range(eod_index.numel()):
i = eod_index[j]
# Mask attention loss.
if reset_attention_mask and attention_mask is not None:
attention_mask[0, (i + 1) :, : (i + 1)] = 0
# Reset positions.
if reset_position_ids:
position_ids[(i + 1) :] -= i + 1 - prev_index
prev_index = i + 1
if attention_mask is not None:
# Convert attention mask to binary:
attention_mask = attention_mask < 0.5
return attention_mask, loss_mask, position_ids
class MockGPTLowLevelDataset:
seed: int = 0
size: int = 100000
max_sequence_length: int = 4096
def __init__(self, tokenizer: MegatronTokenizer) -> None:
self.tokenizer = tokenizer
rng = numpy.random.default_rng(seed=self.seed)
self.sequence_lengths = rng.integers(
low=1, high=self.max_sequence_length, size=self.size, dtype=numpy.int32
)
def __len__(self) -> int:
return self.size
def __getitem__(self, idx: int) -> numpy.number:
length = self.sequence_lengths[idx]
sample = numpy.int64(
numpy.concatenate([numpy.arange(length - 1) + 1, [self.tokenizer.eod]])
)
return sample
def get(self, idx: int, offset: int = 0, length: Optional[int] = None) -> numpy.ndarray:
if length is None:
length = self.sequence_lengths[idx] - offset
return self[idx][offset : offset + length]
class MockGPTDataset(GPTDataset):
"""The mock GPT dataset
Args:
indexed_dataset (MockGPTLowLevelDataset): The MockGPTLowLevelDataset around which to build the MockGPTDataset
dataset_path (Optional[str]): This argument is of no consequence for the MockGPTDataset
indices (numpy.ndarray): The set of the dataset indices to expose
num_samples (int): The number of samples to draw from the dataset
index_split (Split): The indices Split
config (GPTDatasetConfig): The config
"""
def __init__(
self,
dataset: MockGPTLowLevelDataset,
dataset_path: Optional[str],
indices: numpy.ndarray,
num_samples: int,
index_split: Split,
config: GPTDatasetConfig,
) -> None:
assert config.mock
super().__init__(dataset, dataset_path, indices, num_samples, index_split, config)
@staticmethod
def numel_low_level_dataset(low_level_dataset: MockGPTLowLevelDataset) -> int:
"""Abstract method implementation
Args:
low_level_dataset (MockGPTLowLevelDataset): The underlying MockGPTLowLevelDataset
Returns:
int: The number of unique elements in the underlying MockGPTLowLevelDataset
"""
return len(low_level_dataset)
@staticmethod
def build_low_level_dataset(
dataset_path: Optional[str], config: GPTDatasetConfig
) -> MockGPTLowLevelDataset:
"""Abstract method implementation
Args:
dataset_path (Optional[str]): This argument is of no consequence for the MockGPTLowLevelDataset
config (GPTDatasetConfig): The config
Returns:
MockGPTLowLevelDataset: The underlying MockGPTLowLevelDataset
"""
return MockGPTLowLevelDataset(config.tokenizer)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment