Commit 4e867b3c authored by jerrrrry's avatar jerrrrry
Browse files

Initial commit

parents
# RETRO MODEL
## Table of contents
- [1. Training Setup](#1-training-setup)
- [2. Data Preprocessing](#2-data-preprocessing)
- [3. Configurations](#3-configurations)
## 1. Training setup
<a id="markdown-training-setup" name="training-setup"></a>
To run the model using a docker container run it as follows
```
PYTORCH_IMAGE=nvcr.io/nvidia/pytorch:23.09-py3
CHECKPOINT_PATH="" #<Specify path>
TENSORBOARD_LOGS_PATH=""#<Specify path>
docker run \
--gpus=all \
--ipc=host \
--workdir /workspace/megatron-lm \
-v /path/to/data:/path/to/data \
-v /path/to/megatron-lm:/workspace/megatron-lm \
megatron-lm nvcr.io/nvidia/pytorch:23.09-py3 \
bash examples/retro/train_retro_2b_distributed.sh $CHECKPOINT_PATH $TENSORBOARD_LOGS_PATH"
```
NOTE: Depending on the environment you are running it the above command might look slightly different.
NOTE: Due to how Retro preprocess and caches elements of the pretraining dataset before training begins, some arguments are auto-loaded from the Retro preprocessing configuration. These loaded arguments include:
- `--data-path`
- `--data-cache-path`
- `--eval-interval`
- `--eval-iters`
- `--global-batch-size`
- `--tokenizer-type`
- `--tokenizer-model`
- `--vocab-file`
- `--merge-file`
- `--seed`
- `--seq-length`
- `--train-samples`
## 2. Data Preprocessing
<a id="markdown-data-preprocessing" name="data-preprocessing"></a>
Retro preprocesses and caches data prior to pretraining, to greatly speed up pretraining. During data preprocessing, the retrieval database is built, and neighbor IDs are queried for each sample within the pretraining dataset. Please see `preprocess_data.sh` for an example script to preprocess data for Retro. The reference documentation for data preprocessing can be found [here](tools/retro/README.md).
## 3. Configurations
<a id="markdown-configurations" name="configurations"></a>
The example in this folder shows you how to run a 2B model. Below are a few other example configurations.
### 857M
```
--num-layers 24 \
--hidden-size 1024 \
--num-attention-heads 16 \
--seq-length 2048 \
--tensor-model-parallel-size 1 \
--pipeline-model-parallel-size 1 \
```
### 4B
```
--num-layers 48 \
--hidden-size 2560 \
--num-attention-heads 32 \
--tensor-model-parallel-size 1 \
--pipeline-model-parallel-size 1 \
```
#!/bin/bash
set -u
unset NCCL_DEBUG
######## Megatron, Retro dirs. ########
REPO_DIR="<path/to/megatron/repo>"
RETRO_PROJECT_DIR="<path/to/retro/project/directory>"
######## Task (e.g., db, index, query). ########
# This script takes a single argument, which specifies the retro task to be
# performed. The available tasks are: db-build, index-train, index-add, and
# query-neighbors.
# ~~ Examples ~~
# RETRO_TASKS="db-build" # Build the retrieval database
# RETRO_TASKS="index-train" # Train the index
# RETRO_TASKS="index-add" # Add data to the index
# RETRO_TASKS="query-neighbors" # Perform query pretraining for neighbors
# You can also provide the task as a command-line argument when executing the
# script. Example: ./preprocess_data.sh index-add
RETRO_TASKS=$1
######## Data. ########
DATA_BLEND="<see --data-path in arguments.py>"
######## Index. ########
RETRO_INDEX_STR="OPQ32_64,IVF65536_HNSW8,PQ32"
RETRO_INDEX_NTRAIN=66625331
RETRO_INDEX_TRAIN_LOAD_FRACTION=0.97
RETRO_INDEX_ADD_LOAD_FRACTION=0.95
######## GPT. ########
RETRO_GPT_SEED=1234
RETRO_GPT_SPLIT="98,2,0"
RETRO_GPT_DATA_PATH=${DATA_BLEND}
RETRO_GPT_TRAIN_SAMPLES=200000
RETRO_GPT_EVAL_INTERVAL=2000
RETRO_GPT_EVAL_ITERS=50
RETRO_GPT_LR_DECAY_SAMPLES=175000
RETRO_GPT_LR_WARMUP_SAMPLES=10000
RETRO_GPT_SEQ_LENGTH=2048
RETRO_GPT_GLOBAL_BATCH_SIZE=256
RETRO_GPT_CHUNK_LENGTH=64
######## Query. ########
RETRO_QUERY_NUM_NEIGHBORS_QUERY=200
RETRO_QUERY_NUM_NEIGHBORS_SAVE=20
RETRO_QUERY_EF_SEARCH=32
RETRO_QUERY_NPROBE=4096
######## Args. ########
ARGS=" \
--distributed-timeout-minutes 600 \
--tensor-model-parallel-size 1 \
--pipeline-model-parallel-size 1 \
--num-layers 24 \
--hidden-size 1024 \
--num-attention-heads 16 \
--micro-batch-size 1 \
--global-batch-size ${RETRO_GPT_GLOBAL_BATCH_SIZE} \
--seq-length 512 \
--max-position-embeddings 512 \
--load ${RETRO_PROJECT_DIR}/checkpoints/bert \
--exit-on-missing-checkpoint \
--no-load-optim \
--data-path [null] \
--tokenizer-type BertWordPieceLowerCase \
--vocab-file ${RETRO_PROJECT_DIR}/tokenizer/bert-large-uncased-vocab.txt \
--split ${RETRO_GPT_SPLIT} \
--distributed-backend nccl \
--lr 0.0001 \
--lr-decay-style linear \
--min-lr 1.0e-5 \
--train-samples ${RETRO_GPT_TRAIN_SAMPLES} \
--lr-decay-samples ${RETRO_GPT_LR_DECAY_SAMPLES} \
--lr-warmup-samples ${RETRO_GPT_LR_WARMUP_SAMPLES} \
--weight-decay 1e-2 \
--clip-grad 1.0 \
--eval-interval ${RETRO_GPT_EVAL_INTERVAL} \
--eval-iters ${RETRO_GPT_EVAL_ITERS} \
--bf16 \
--no-data-sharding \
--no-gradient-accumulation-fusion \
--no-async-tensor-model-parallel-allreduce \
--bert-embedder-type megatron \
--output-bert-embeddings \
\
--retro-project-dir ${RETRO_PROJECT_DIR} \
--retro-tasks ${RETRO_TASKS} \
--retro-bert-vocab-file tokenizer/bert-large-uncased-vocab.txt \
--retro-bert-tokenizer-type BertWordPieceLowerCase \
\
--retro-gpt-seed ${RETRO_GPT_SEED} \
--retro-gpt-tokenizer-type GPTSentencePieceTokenizer \
--retro-gpt-tokenizer-model /path/to/tokenizer/model \
--retro-gpt-seq-length ${RETRO_GPT_SEQ_LENGTH} \
--retro-gpt-chunk-length ${RETRO_GPT_CHUNK_LENGTH} \
--retro-gpt-global-batch-size ${RETRO_GPT_GLOBAL_BATCH_SIZE} \
--retro-gpt-eval-interval ${RETRO_GPT_EVAL_INTERVAL} \
--retro-gpt-eval-iters ${RETRO_GPT_EVAL_ITERS} \
--retro-gpt-split ${RETRO_GPT_SPLIT} \
--retro-gpt-data-path ${RETRO_GPT_DATA_PATH} \
--retro-gpt-train-samples ${RETRO_GPT_TRAIN_SAMPLES} \
\
--retro-index-str ${RETRO_INDEX_STR} \
--retro-index-ntrain ${RETRO_INDEX_NTRAIN} \
--retro-index-train-load-fraction ${RETRO_INDEX_TRAIN_LOAD_FRACTION} \
--retro-index-add-load-fraction ${RETRO_INDEX_ADD_LOAD_FRACTION} \
--no-retro-index-delete-training-embeddings \
--no-retro-index-delete-added-codes \
\
--retro-query-num-neighbors-query ${RETRO_QUERY_NUM_NEIGHBORS_QUERY} \
--retro-query-num-neighbors-save ${RETRO_QUERY_NUM_NEIGHBORS_SAVE} \
--retro-query-ef-search ${RETRO_QUERY_EF_SEARCH} \
--retro-query-nprobe ${RETRO_QUERY_NPROBE} \
"
######## Command. ########
NPROCS=8 # Number of GPUs.
CMD="\
cd ${REPO_DIR} && pwd && \
export PYTHONPATH=$PYTHONPATH:${REPO_DIR} && \
python -m torch.distributed.run \
--nproc_per_node ${NPROCS} \
--nnodes 1 \
--node_rank ${NODE_RANK} \
--master_addr ${MASTER_ADDR} \
--master_port 6000 \
tools/retro/preprocess_data.py ${ARGS} \
"
echo "~~~~~~~~~~~~~~~~~~~~~~~~~~"
echo "CMD = '$CMD'."
echo "~~~~~~~~~~~~~~~~~~~~~~~~~~"
eval $CMD
#!/bin/bash
# Runs the "307M" parameter Retro model.
export CUDA_DEVICE_MAX_CONNECTIONS=1
GPUS_PER_NODE=8
# Change for multinode config
MASTER_ADDR=localhost
MASTER_PORT=6000
NUM_NODES=1
NODE_RANK=0
WORLD_SIZE=$(($GPUS_PER_NODE*$NUM_NODES))
CHECKPOINT_PATH=$1 #<Specify path>
TENSORBOARD_LOGS_PATH=$2 #<Specify path>
DISTRIBUTED_ARGS=(
--nproc_per_node $GPUS_PER_NODE
--nnodes $NUM_NODES
--master_addr $MASTER_ADDR
--master_port $MASTER_PORT
)
######## GPT or Retro? ########
# 0 : GPT.
# 1 : Retro
ADD_RETRIEVER=1
######## Megatron, Retro dirs. ########
RETRO_PROJECT_DIR="<path/to/retro/project/directory>"
######## Model, training args. ########
# ** Note: --seq-length auto loaded from Retro project dir.
RETRO_MODEL_ARGS=(
--num-layers 32
--hidden-size 2048
--num-attention-heads 32
)
# ** Note: --data-path, --tokenizer-type, and --tokenizer-model auto loaded from Retro project dir.
DATA_ARGS=(
--split 98,2,0
)
MODEL_PARALLEL_ARGS=(
--tensor-model-parallel-size 8
--pipeline-model-parallel-size 1
)
# ** Note: --eval-interval, --eval-iters auto loaded from Retro project dir.
EVAL_AND_LOGGING_ARGS=(
--log-interval 100
--save-interval 10000
--eval-interval 1000
--save $CHECKPOINT_PATH
--load $CHECKPOINT_PATH
--eval-iters 10
--tensorboard-dir $TENSORBOARD_LOGS_PATH
)
TRAINING_ARGS=" \
--retro-project-dir ${RETRO_PROJECT_DIR} \
--transformer-impl transformer_engine \
--num-workers 8 \
--micro-batch-size 4 \
--lr-decay-samples 166400000 \
--lr-warmup-samples 162761 \
--lr 6.0e-4 \
--min-lr 6.0e-5 \
--lr-decay-style cosine \
--clip-grad 1.0 \
--weight-decay 0.1 \
--adam-beta1 0.9 \
--adam-beta2 0.95 \
--init-method-std 0.023 \
--log-params-norm \
--log-num-zeros-in-grad \
--bf16 \
--no-data-sharding \
"
if [ "$ADD_RETRIEVER" = "1" ]; then
TRAINING_ARGS+=" --retro-add-retriever"
fi
######## Command. ########
torchrun ${DISTRIBUTED_ARGS[@]} pretrain_retro.py \
${RETRO_MODEL_ARGS[@]} \
${TRAINING_ARGS} \
${MODEL_PARALLEL_ARGS[@]} \
${DATA_ARGS[@]} \
${EVAL_AND_LOGGING_ARGS[@]}
import os
import torch
from torch.optim import Adam
from torch.utils.data import DataLoader
from functools import partial
from pathlib import Path
from megatron.core import parallel_state
from megatron.core import dist_checkpointing
from megatron.core.pipeline_parallel.schedules import get_forward_backward_func
from megatron.core.tensor_parallel.random import model_parallel_cuda_manual_seed
from megatron.core.transformer.transformer_config import TransformerConfig
from megatron.core.models.gpt.gpt_model import GPTModel
from megatron.core.models.gpt.gpt_layer_specs import get_gpt_layer_local_spec
from megatron.core.datasets.utils import compile_helpers
from megatron.core.datasets.blended_megatron_dataset_builder import BlendedMegatronDatasetBuilder
from megatron.core.datasets.gpt_dataset import GPTDatasetConfig, MockGPTDataset
from megatron.training.tokenizer.tokenizer import _NullTokenizer
_SEQUENCE_LENGTH = 64
def initialize_distributed(tensor_model_parallel_size=1, pipeline_model_parallel_size=1):
parallel_state.destroy_model_parallel()
# Torch setup for distributed training
rank = int(os.environ['LOCAL_RANK'])
world_size = torch.cuda.device_count()
torch.cuda.set_device(rank)
torch.distributed.init_process_group(world_size=world_size, rank=rank)
# Megatron core distributed training initialization
parallel_state.initialize_model_parallel(tensor_model_parallel_size, pipeline_model_parallel_size)
def model_provider():
"""Build the model."""
transformer_config = TransformerConfig(
num_layers=2,
hidden_size=12,
num_attention_heads=4,
use_cpu_initialization=True,
pipeline_dtype=torch.float32,
)
gpt_model = GPTModel(
config=transformer_config,
transformer_layer_spec=get_gpt_layer_local_spec(),
vocab_size=100,
max_sequence_length=_SEQUENCE_LENGTH,
)
return gpt_model
def get_train_data_iterator():
if torch.distributed.is_available() and torch.distributed.is_initialized():
if torch.distributed.get_rank() == 0:
compile_helpers()
torch.distributed.barrier()
else:
compile_helpers()
config = GPTDatasetConfig(
random_seed=0,
sequence_length=_SEQUENCE_LENGTH,
reset_position_ids=False,
reset_attention_mask=False,
eod_mask_loss=False,
tokenizer=_NullTokenizer(vocab_size=_SEQUENCE_LENGTH),
)
datasets = BlendedMegatronDatasetBuilder(
MockGPTDataset, [1000, None, None], lambda: True, config
).build()
train_dataloader = DataLoader(datasets[0], batch_size=8, shuffle=True)
train_iterator = iter(train_dataloader)
return train_iterator
def forward_step_func(data_iterator, model):
def loss_func(loss_mask: torch.Tensor, output_tensor: torch.Tensor):
losses = output_tensor.float()
loss_mask = loss_mask.view(-1).float()
loss = torch.sum(losses.view(-1) * loss_mask) / loss_mask.sum()
# If you have data parallel reduce loss across data parallel groups.
# If pipeline parallel, loss computation is done only in last stage.
return loss, {'lm loss': loss}
data = next(data_iterator)
tokens = data['tokens'].to(device)
attention_mask = data['attention_mask'].to(device)
position_ids = data['position_ids'].to(device)
labels = data['labels'].to(device)
loss_mask = data['loss_mask'].to(device)
output_tensor = model(tokens, position_ids, attention_mask,
labels=labels)
return output_tensor, partial(loss_func, loss_mask)
def save_distributed_checkpoint(checkpoint_path, gpt_model):
sharded_state_dict = gpt_model.sharded_state_dict(prefix='')
dist_checkpointing.save(sharded_state_dict=sharded_state_dict, checkpoint_dir=checkpoint_path)
def load_distributed_checkpoint(checkpoint_path, gpt_model):
sharded_state_dict=gpt_model.sharded_state_dict(prefix='')
checkpoint = dist_checkpointing.load(sharded_state_dict=sharded_state_dict, checkpoint_dir=checkpoint_path)
gpt_model.load_state_dict(checkpoint)
return gpt_model
if __name__ == "__main__":
initialize_distributed(tensor_model_parallel_size=2, pipeline_model_parallel_size=1)
model_parallel_cuda_manual_seed(123)
gpt_model = model_provider()
device = torch.device("cuda")
gpt_model.to(device)
optim = Adam(gpt_model.parameters())
train_iterator = get_train_data_iterator()
forward_backward_func = get_forward_backward_func()
# Running the model for 5 iterations
for _ in range(5):
optim.zero_grad()
losses_reduced = forward_backward_func(
forward_step_func=forward_step_func,
data_iterator=train_iterator,
model=gpt_model,
num_microbatches=1,
seq_length=_SEQUENCE_LENGTH,
micro_batch_size=8,
decoder_seq_length=_SEQUENCE_LENGTH,
forward_only=False)
optim.step()
print(f'Losses reduced : {losses_reduced}')
# Saving the model
ckpt_path = os.getcwd() + '/ckpt'
Path(ckpt_path).mkdir(exist_ok=True)
save_distributed_checkpoint(gpt_model=gpt_model, checkpoint_path=ckpt_path)
# Loading the model
gpt_model = load_distributed_checkpoint(gpt_model=gpt_model, checkpoint_path=ckpt_path)
gpt_model.to(device)
print('Successfully loaded the model')
# T5 MODEL
## Table of contents
- [1. Training Setup](#1-training-setup)
- [2. Configurations](#2-configurations)
- [3. Training Results](#3-training-results)
## 1. Training setup
<a id="markdown-training-setup" name="training-setup"></a>
To run the model on a Slurm based cluster
```
PYTORCH_IMAGE=nvcr.io/nvidia/pytorch:23.09-py3
ACCOUNT_NAME=""
PARTITION=""
JOB_NAME=""
NUM_NODES=1
CHECKPOINT_PATH="" #<Specify path to checkpoint>
TENSORBOARD_LOGS_PATH=""#<Specify path to tensorboard log>
VOCAB_FILE="" #<Specify path to file>/bert-large-cased-vocab.txt
DATA_PATH="" #<Specify path and file prefix>_text_document
srun -N $NUM_NODES --container-image $PYTORCH_IMAGE --container-mounts "/path/to/data:/path/to/data,/path/to/megatron-lm:/workspace/megatron-lm" --account $ACCOUNT -N 1 -J $JOB_NAME -p $PARTITION --no-container-mount-home -c "
cd /workspace/megatron-lm
./examples/t5/train_t5_220m_distributed.sh $CHECKPOINT_PATH $TENSORBOARD_LOGS_PATH $VOCAB_FILE $DATA_PATH"
```
## 2. Configurations
<a id="markdown-configurations" name="configurations"></a>
The architecture arguments below shows configuration for T5 220M model.
### 220M
```
--num-layers 12 \
--hidden-size 768 \
--num-attention-heads 12 \
--kv-channels 64 \
--ffn-hidden-size 3072 \
--encoder-seq-length 512 \
--decoder-seq-length 128 \
--max-position-embeddings 512 \
--tensor-model-parallel-size 1 \
--pipeline-model-parallel-size 1 \
```
## 3. Training Results
<a id="markdown-training-results" name="training-results"></a>
Below is the training curve for the 220M model on Pile dataset. The training takes 4 days on 32 GPUs, with batch size of 2048.
Finetuning on SQUAD dataset, the validation result is: 63.44\%
<p align="center">
<img src="./t5_mcore_train_curve.png" width="800" height="400">
</p>
#!/bin/bash
# Runs the "220M" parameter model
export CUDA_DEVICE_MAX_CONNECTIONS=1
GPUS_PER_NODE=8
# Change for multinode config
MASTER_ADDR=localhost
MASTER_PORT=6000
NUM_NODES=1
NODE_RANK=0
WORLD_SIZE=$(($GPUS_PER_NODE*$NUM_NODES))
CHECKPOINT_PATH=$1 #<Specify path>
TENSORBOARD_DIR=$2 #<Specify path>
VOCAB_FILE=$3 #<Specify path to file>/bert-large-cased-vocab.txt
DATA_PATH=$4 #<Specify path and file prefix>_text_document
DISTRIBUTED_ARGS="
--nproc_per_node $GPUS_PER_NODE \
--nnodes $NUM_NODES \
--node_rank $NODE_RANK \
--master_addr $MASTER_ADDR \
--master_port $MASTER_PORT
"
T5_ARGS="
--encoder-num-layers 12 \
--decoder-num-layers 12 \
--hidden-size 768 \
--num-attention-heads 12 \
--kv-channels 64 \
--ffn-hidden-size 3072 \
--encoder-seq-length 512 \
--decoder-seq-length 128 \
--max-position-embeddings 512 \
--micro-batch-size 64 \
--global-batch-size 512 \
--lr 0.0001 \
--train-iters 1000000 \
--lr-decay-iters 1000000 \
--lr-decay-style linear \
--min-lr 0.00001 \
--weight-decay 1e-2 \
--lr-warmup-fraction .01 \
--clip-grad 1.0 \
--bf16 \
--vocab-extra-ids 100 \
--init-method-std 0.015 \
--transformer-impl transformer_engine \
--tensor-model-parallel-size 1 \
--pipeline-model-parallel-size 1 \
--attention-backend auto \
"
DATA_ARGS="
--data-path $DATA_PATH \
--vocab-file $VOCAB_FILE \
--tokenizer-type BertWordPieceCase \
--split 99982,9,9 \
"
OUTPUT_ARGS="
--log-interval 100 \
--tensorboard-dir ${TENSORBOARD_DIR} \
--save-interval 500 \
--eval-interval 1000 \
--eval-iters 10
"
torchrun $DISTRIBUTED_ARGS pretrain_t5.py \
$T5_ARGS \
$DATA_ARGS \
$OUTPUT_ARGS \
--distributed-backend nccl \
--save $CHECKPOINT_PATH \
--load $CHECKPOINT_PATH \
## Quick Start
This guide for Megatron Core walks you through the following tasks:
* Initialize Megatron Core on two GPUS.
* Build a GPT model with a tensor model parallel size of two and a pipeline parallel size of one.
* Train the model for five iterations using Megatron Core schedules.
* Save the model using the distributed checkpoint format.
* Load the model.
**NOTE:** The following sample was tested using Megatron Core version 0.8.0 and NGC PyTorch Container version 24.02.
### Set Up Your Environment
1. Run a new Docker container.
1. Clone the Megatron GitHub repo in it.
```
docker run --ipc=host --shm-size=512m --gpus 2 -it nvcr.io/nvidia/pytorch:24.02-py3
git clone https://github.com/NVIDIA/Megatron-LM.git && cd Megatron-LM
```
<br>
### Write Your First Training Loop
In this task, you create a sample GPT model split across tensors (Tensor model parallel) on two GPUS, and run a forward pass through it using a MockGPT dataset helper class that was created in Megatron Core.
<br>
**NOTE:** All of the following steps are in the [run_simple_mcore_train_loop.py](https://github.com/NVIDIA/Megatron-LM/tree/main/examples/run_simple_mcore_train_loop.py) script. To run the ``run_simple_mcore_train_loop.py`` script:
```
PYTHONPATH=$PYTHON_PATH:./megatron torchrun --nproc-per-node 2 examples/run_simple_mcore_train_loop.py
```
1. Initialize the distributed training and set up the model parallel:
The following utility, when called, initializes your distributed setup:
```python
import os
import torch
from megatron.core import parallel_state
def initialize_distributed(tensor_model_parallel_size = 1, pipeline_model_parallel_size = 1):
# Torch setup for distributed training
rank = int(os.environ['LOCAL_RANK'])
world_size = torch.cuda.device_count()
torch.cuda.set_device(rank)
torch.distributed.init_process_group(world_size=world_size, rank=rank)
# Megatron core distributed training initialization
parallel_state.initialize_model_parallel(tensor_model_parallel_size, pipeline_model_parallel_size)
```
<br>
1. Set up the GPT model:
Use the following code snippet to create a GPT model. For a list of other configurations that you can pass into the model, open and review [transformer_config.py](https://github.com/NVIDIA/Megatron-LM/tree/main/megatron/core/transformer/transformer_config.py).
```
from megatron.core.transformer.transformer_config import TransformerConfig
from megatron.core.models.gpt.gpt_model import GPTModel
from megatron.core.models.gpt.gpt_layer_specs import get_gpt_layer_local_spec
def model_provider():
"""Build the model."""
transformer_config = TransformerConfig(
num_layers=2,
hidden_size=12,
num_attention_heads=4,
use_cpu_initialization=True,
pipeline_dtype=torch.float32)
gpt_model = GPTModel(
config=transformer_config,
transformer_layer_spec=get_gpt_layer_local_spec(),
vocab_size=100,
max_sequence_length=64)
return gpt_model
```
<br>
1. Set up the GPT mock dataset:
Use the following code snippet to explore the mock dataset utility.
* To train the model using your data, use the `GPTDataset` class in [gpt_dataset.py](https://github.com/NVIDIA/Megatron-LM/tree/main/megatron/core/datasets/gpt_dataset.py).
* To find more information about Megatron Core data pipeline, see the [data pipeline readme.md](https://github.com/NVIDIA/Megatron-LM/tree/main/megatron/core/datasets/readme.md?ref_type=heads).
```
import torch
from torch.utils.data import DataLoader
from megatron.core.datasets.blended_megatron_dataset_builder import BlendedMegatronDatasetBuilder
from megatron.core.datasets.gpt_dataset import GPTDatasetConfig, MockGPTDataset
from megatron.training.tokenizer.tokenizer import _NullTokenizer
from megatron.core.datasets.utils import compile_helpers
_SEQUENCE_LENGTH = 64
def get_train_data_iterator():
if torch.distributed.is_available() and torch.distributed.is_initialized():
if torch.distributed.get_rank() == 0:
compile_helpers()
torch.distributed.barrier()
else:
compile_helpers()
config = GPTDatasetConfig(
random_seed=0,
sequence_length=_SEQUENCE_LENGTH,
reset_position_ids=False,
reset_attention_mask=False,
eod_mask_loss=False,
tokenizer=_NullTokenizer(vocab_size=_SEQUENCE_LENGTH),
)
datasets = BlendedMegatronDatasetBuilder(
MockGPTDataset, [1000, None, None], lambda: True, config
).build()
train_dataloader = DataLoader(datasets[0], batch_size=8, shuffle=True)
train_iterator = iter(train_dataloader)
return train_iterator
```
<br>
1. Add a forward step function:
Megatron Core uses [schedules.py](https://github.com/NVIDIA/Megatron-LM/tree/main/megatron/core/pipeline_parallel/schedules.py) to run the model. Define a forward step function that takes the data iterator and the model as input and produces the output tensor and a loss function.
```python
from functools import partial
def forward_step_func(data_iterator, model):
def loss_func(loss_mask: torch.Tensor, output_tensor: torch.Tensor):
losses = output_tensor.float()
loss_mask = loss_mask.view(-1).float()
loss = torch.sum(losses.view(-1) * loss_mask) / loss_mask.sum()
# If you have data parallel reduce loss across data parallel groups.
# If pipeline parallel, loss computation is done only in last stage.
return loss, {'lm loss': loss}
data = next(data_iterator)
tokens = data['tokens'].to(device)
attention_mask = data['attention_mask'].to(device)
position_ids = data['position_ids'].to(device)
labels = data['labels'].to(device)
loss_mask = data['loss_mask'].to(device)
output_tensor = model(tokens, position_ids, attention_mask,
labels=labels)
return output_tensor, partial(loss_func, loss_mask)
```
<br>
1. Define your load and save distributed checkpoints:
Megatron Core uses distributed checkpoints for loading and saving models. This allows you to convert the model from one parallel setting to another when you load it.
For example, a model trained with tensor parallel size `2`, can be loaded again as a tensor model with parallel size `4`.
```python
from megatron.core import dist_checkpointing
def save_distributed_checkpoint(checkpoint_path, gpt_model):
sharded_state_dict = gpt_model.sharded_state_dict(prefix='')
dist_checkpointing.save(sharded_state_dict=sharded_state_dict, checkpoint_dir=checkpoint_path)
def load_distributed_checkpoint(checkpoint_path, gpt_model):
sharded_state_dict=gpt_model.sharded_state_dict(prefix='')
checkpoint = dist_checkpointing.load(sharded_state_dict=sharded_state_dict, checkpoint_dir=checkpoint_path)
gpt_model.load_state_dict(checkpoint)
return gpt_model
```
<br>
1. Add the main function:
The following code snippet is the main function that needs to go into your script. It runs the model for five iterations, saves, and loads it.
```python
from pathlib import Path
from torch.optim import Adam
from megatron.core.pipeline_parallel.schedules import get_forward_backward_func
from megatron.core.tensor_parallel.random import model_parallel_cuda_manual_seed
if __name__ == "__main__":
initialize_distributed(tensor_model_parallel_size=2, pipeline_model_parallel_size=1)
model_parallel_cuda_manual_seed(123)
gpt_model = model_provider()
device = torch.device("cuda")
gpt_model.to(device)
optim = Adam(gpt_model.parameters())
train_iterator = get_train_data_iterator()
forward_backward_func = get_forward_backward_func()
# Running the model for 5 iterations
for _ in range(5):
optim.zero_grad()
losses_reduced = forward_backward_func(
forward_step_func=forward_step_func,
data_iterator=train_iterator,
model=gpt_model,
num_microbatches=1,
seq_length=64,
micro_batch_size=8,
decoder_seq_length=64,
forward_only=False)
optim.step()
print(f'Losses reduced : {losses_reduced}')
# Saving the model
save_distributed_checkpoint(gpt_model=gpt_model, checkpoint_path='/workspace/ckpt')
# Loading the model
gpt_model = load_distributed_checkpoint(gpt_model=gpt_model, checkpoint_path='/workspace/ckpt')
gpt_model.to(device)
print('Successfully loaded the model')
```
<br>
### Review Advanced Examples
To review more advanced examples, explore [pretrain_gpt.py](https://github.com/NVIDIA/Megatron-LM/blob/main/pretrain_gpt.py). ``pretrain_gpt.py`` has more complex training loops and includes the following Megatron Core features:
* pipeline parallel
* context parallel
* rope embeddings
* mixture of experts
# Megatron-Core
Megatron-Core is an open-source PyTorch-based library that contains GPU-optimized techniques and cutting-edge system-level optimizations. It abstracts them into composable and modular APIs, allowing full flexibility for developers and model researchers to train custom transformers at-scale on NVIDIA accelerated computing infrastructure. This library is compatible with all NVIDIA Tensor Core GPUs, including FP8 acceleration support for [NVIDIA Hopper architectures](https://www.nvidia.com/en-us/data-center/technologies/hopper-architecture/).
Megatron-Core offers core building blocks such as attention mechanisms, transformer blocks and layers, normalization layers, and embedding techniques. Additional functionality like activation re-computation, distributed checkpointing is also natively built-in to the library. The building blocks and functionality are all GPU optimized, and can be built with advanced parallelization strategies for optimal training speed and stability on NVIDIA Accelerated Computing Infrastructure. Another key component of the Megatron-Core library includes advanced model parallelism techniques (tensor, sequence, pipeline, context, and MoE expert parallelism).
Megatron-Core can be used with [NVIDIA NeMo](https://www.nvidia.com/en-us/ai-data-science/products/nemo/), an enterprise-grade AI platform. Alternatively, you can explore Megatron-Core with the native PyTorch training loop [here](https://github.com/NVIDIA/Megatron-LM/tree/main/examples). Visit [Megatron-Core documentation](https://docs.nvidia.com/megatron-core/developer-guide/latest/index.html) to learn more.
## Quick links
- [Benchmark using NVIDIA NeMo](https://docs.nvidia.com/nemo-framework/user-guide/latest/overview.html#performance-benchmarks)
- [Multimodal example (LLaVA training pipeline)](https://github.com/NVIDIA/Megatron-LM/tree/main/examples/multimodal)
- [Mixture-of-Experts](https://github.com/NVIDIA/Megatron-LM/tree/main/megatron/core/transformer/moe)
- [Training Mamba-based Language Models](https://github.com/NVIDIA/Megatron-LM/tree/main/examples/mamba)
## StragglerDetector for a TP Group
The file `megatron/core/utils.py` has a class named `StragglerDetector` which supports Python Contexts.
It can be used to find straggling TP group based on the RTT of the ranks in the TP Group. It also collects
Power/Temp/Utilization for GPUs, which can additionally be used to narrow down to the exact GPU in the TP Group,
assuming the straggling was caused by hardware anomaly in a given GPU.<br>
This class supports collecting timing events for various steps of a given iteration. It
keeps collecting such timing events on a per rank basis, and when the reporter is invoked
during a logging interval, it computes the min and max of certain metric across all
ranks and logs the observed metric and the rank as follows
```
0: INFO:megatron.core.utils:[2024-03-14 23:07:56] | MnRtt/Rnk: 3453.08ms/8 | MxRtt/Rnk: 3468.20ms/0 | MnPwr/Rnk: 601796W/8 | MxPwr/Rnk: 683801W/18 | MnTmp/Rnk: 52C/0 | MxTmp/Rnk: 65C/21 | MnUtl/Rnk: 97%/8 | MxUtl/Rnk: 100%/6 | MnClk/Rnk: 1950MHz/28 | MxClk/Rnk: 1980MHz/0 | MnDRtt/Rnk: 14.27ms/23 | MxDRtt/Rnk: 34.65ms/3 | MnEtpt/Rnk: 296.02TF/0 | MxEtpt/Rnk: 297.32TF/8
```
<hr>
### Description of the metrics
Each metric is prefixed with `Mn` or `Mx` to represent `Minimum` or `Maximum`. Each metric is also suffixed with the rank where the metric was measured. The metrics are averaged over the logging interval. Between the prefix and the rank is the name of the metric as follows
- Rtt : RoundTrip Time (time spent in all the traced ops per iteration)
- Pwr : GPU Power
- Tmp : GPU Temperature
- Utl : GPU Utilization
- Clk : GPU Clock
- DRtt: get_batch latency
- Etpt: Estimated throughput. This is derived from actual computed throughput dividied by Rtt. Since we do not collect timing for backward pass, the value is further divided by three to come up with estimated throughput.
<hr>
### Command Line activation
To start using the StragglerDetector, need to pass the following argument `--log-straggler`. It optionally also takes two additional parameters. Default disabled
- `--disable-straggler-on-startup` - whether to keept the StragglerDetector disabled on startup and enable later. Default enabled
- `--straggler-ctrlr-port` - The StragglerDetector can toggle between on/off just by sending `curl Rank0Host:port`. Default port is 65535. Every time it is turned
- `--straggler-minmax-count` - If set to > 1 (N), it prints N Top and Bottom Etpt/Rank pairs as shown below
```
0: INFO:megatron.core.utils:^^^^ Bottom 4 Ranks with lowest Etpt(TF): 296.02/0, 296.17/2, 296.23/1, 296.23/4,
0: INFO:megatron.core.utils:^^^^ Top 4 Ranks with highest Etpt(TF): 297.28/15, 297.28/11, 297.32/12, 297.32/8,
```
<hr>
### Programming the StragglerDetector
The StragglerDetector class supports context, and its implementation is a Singleton.
- Initialization
```
# initialization, where StragglerDetector will be used
from megatron.core.utils import StragglerDetector
stimer = StragglerDetector()
```
- One time for each rank
```
# one time before the training loop starts
stimer.configure(world, rank, enabled=True, port=65545)
# Arguments to configure
# world : World Size
# rank : The rank of this trainer
# mmcnt : (Optional) Number of ranks to print for showing Min/Max Etpt
# amp : (Optional) Set to 3.0 if we only use timers in fwd pass
# port : (Optional) control port, useful only for rank-0
# prefill : (Optional) howmany Events to pre-populate
# enabled : (Optional) whether or not collection is enabled on startup
```
- To Capture time
```
# whereever timing need to be captured
with stimer:
do_operation()
# special case for get_batch
with stimer(bdata=True):
input,... = get_batch(iterator,...)
```
- Logging in main training loop
```
# logging
total_flops = 0.0
iteration = 0
# inside the main training loop
while training:
iteration += 1
do_step()
total_flops += get_computed_flops()
if iteration % log_interval:
stimer.report(total_flops, log_interval)
total_flops = 0.0
```
# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved.
import megatron.core.tensor_parallel
import megatron.core.utils
from megatron.core import parallel_state
from megatron.core.distributed import DistributedDataParallel
from megatron.core.inference_params import InferenceParams
from megatron.core.model_parallel_config import ModelParallelConfig
from megatron.core.package_info import (
__contact_emails__,
__contact_names__,
__description__,
__download_url__,
__homepage__,
__keywords__,
__license__,
__package_name__,
__repository_url__,
__shortversion__,
__version__,
)
from megatron.core.timers import Timers
# Alias parallel_state as mpu, its legacy name
mpu = parallel_state
__all__ = [
"parallel_state",
"tensor_parallel",
"utils",
"DistributedDataParallel",
"InferenceParams",
"ModelParallelConfig",
"Timers",
]
# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved.
ENABLE_EXPERIMENTAL = False
# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved.
import dataclasses
import json
import os
import torch
import torch.nn as nn
from megatron.core import parallel_state
def get_config_logger_path(config):
return getattr(config, 'config_logger_dir', '')
def has_config_logger_enabled(config):
return get_config_logger_path(config) != ''
# For each prefix, holds a counter and increases it every time we dump with this
# prefix.
__config_logger_path_counts = {}
def get_path_count(path):
"""
keeps tracks of number of times we've seen the input `path` and return count-1
"""
global __config_logger_path_counts
if not path in __config_logger_path_counts:
__config_logger_path_counts[path] = 0
count = __config_logger_path_counts[path]
__config_logger_path_counts[path] += 1
return count
def get_path_with_count(path):
"""
calls get_path_count and appends returned value to path
"""
return f'{path}.iter{get_path_count(path)}'
class JSONEncoderWithMcoreTypes(json.JSONEncoder):
def default(self, o):
if type(o).__name__ in ['function', 'ProcessGroup']:
return str(o)
if type(o).__name__ in ['dict', 'OrderedDict']:
return {k: self.default(v) for k, v in o.items()}
if type(o).__name__ in ['list', 'ModuleList']:
return [self.default(val) for val in o]
if type(o).__name__ == 'UniqueDescriptor':
return {
attr: self.default(getattr(o, attr))
for attr in filter(lambda x: not x.startswith('__'), dir(o))
}
if type(o) is torch.dtype:
return str(o)
# if it's a Float16Module, add "Float16Module" to the output dict
if type(o).__name__ == 'Float16Module':
return {'Float16Module': {'module': self.default(o.module)}}
# If it's a nn.Module subchild, either print its children or itself if leaf.
if issubclass(type(o), nn.Module):
if len(getattr(o, '_modules', {})) > 0:
return {key: self.default(val) for key, val in o._modules.items()}
else:
return str(o)
if type(o).__name__ in ['ABCMeta', 'type', 'AttnMaskType']:
return str(o)
if dataclasses.is_dataclass(o) or type(o).__name__ in ['ModuleSpec', 'TransformerConfig']:
return dataclasses.asdict(o)
try:
return super().default(o)
except:
return str(o)
def log_config_to_disk(config, dict_data, prefix=''):
"""
Encodes the input dict (dict_data) using the JSONEncoderWithMcoreTypes
and dumps to disk, as specified via path
"""
path = get_config_logger_path(config)
assert path is not None, 'Expected config_logger_dir to be non-empty in config.'
if 'self' in dict_data:
if prefix == '':
prefix = type(dict_data['self']).__name__
del dict_data['self']
if not os.path.exists(path):
os.makedirs(path, exist_ok=True)
rank = parallel_state.get_all_ranks()
path = get_path_with_count(os.path.join(path, f'{prefix}.rank_{rank}'))
if type(dict_data).__name__ == 'OrderedDict':
torch.save(dict_data, f'{path}.pth')
else:
with open(f'{path}.json', 'w') as fp:
json.dump(dict_data, fp, cls=JSONEncoderWithMcoreTypes)
__all__ = ['has_config_logger_enabled', 'log_config_to_disk']
CXXFLAGS += -O3 -Wall -shared -std=c++11 -fPIC -fdiagnostics-color
CPPFLAGS += $(shell python3 -m pybind11 --includes)
LIBNAME = helpers_cpp
LIBEXT = $(shell python3-config --extension-suffix)
OUT = $(LIBNAME)$(LIBEXT)
SRC = helpers.cpp
default: $(OUT)
$(OUT): $(SRC)
$(CXX) $(CXXFLAGS) $(CPPFLAGS) $< -o $@
# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved.
from dataclasses import dataclass
from typing import Dict, List, Optional, Union
import numpy
from megatron.core.datasets.indexed_dataset import IndexedDataset
from megatron.core.datasets.masked_dataset import (
MaskedWordPieceDataset,
MaskedWordPieceDatasetConfig,
)
from megatron.core.datasets.utils import Split
@dataclass
class BERTMaskedWordPieceDatasetConfig(MaskedWordPieceDatasetConfig):
"""Configuration object for Megatron Core BERT WordPiece datasets"""
classification_head: bool = None
"""Option to perform the next sequence prediction during sampling"""
def __post_init__(self) -> None:
"""Do asserts and set fields post init"""
super().__post_init__()
assert self.classification_head is not None
class BERTMaskedWordPieceDataset(MaskedWordPieceDataset):
"""The BERT dataset that assumes WordPiece tokenization
Args:
indexed_dataset (IndexedDataset): The IndexedDataset around which to build the MegatronDataset
dataset_path (str): The real path on disk to the dataset, for bookkeeping
indexed_indices (numpy.ndarray): The set of the documents indices to expose
num_samples (Optional[int]): The number of samples to draw from the indexed dataset. When None, build as many samples as correspond to one epoch.
index_split (Split): The indexed_indices Split
config (BERTMaskedWordPieceDatasetConfig): The config
"""
def __init__(
self,
indexed_dataset: IndexedDataset,
dataset_path: str,
indexed_indices: numpy.ndarray,
num_samples: Optional[int],
index_split: Split,
config: BERTMaskedWordPieceDatasetConfig,
) -> None:
super().__init__(
indexed_dataset, dataset_path, indexed_indices, num_samples, index_split, config
)
self.token_lookup = list(self.config.tokenizer.inv_vocab.keys())
# Account for the single <cls> and two <sep> token ids
self.sample_index = self._build_sample_index(
self.config.sequence_length - 3, 2 if self.config.classification_head else 1
)
@staticmethod
def _key_config_attributes() -> List[str]:
"""Inherited method implementation
Returns:
List[str]: The key config attributes
"""
return super(
BERTMaskedWordPieceDataset, BERTMaskedWordPieceDataset
)._key_config_attributes() + ["classification_head"]
def __getitem__(self, idx: int) -> Dict[str, Union[int, numpy.ndarray]]:
"""Abstract method implementation
Args:
idx (int): The index into the dataset
Returns:
Dict[str, Union[int, numpy.ndarray]]: The
"""
idx_beg, idx_end, target_sequence_length = self.sample_index[idx]
sample = [self.dataset[i] for i in range(idx_beg, idx_end)]
numpy_random_state = numpy.random.RandomState(seed=(self.config.random_seed + idx) % 2**32)
assert target_sequence_length <= self.config.sequence_length
# Split the sample into contiguous subsegments A and B
pivot = len(sample)
is_next_random = False
if self.config.classification_head:
assert len(sample) > 1, "the sample must contain at least two sentences"
pivot = 1
if len(sample) >= 3:
pivot = numpy_random_state.randint(low=1, high=len(sample))
is_next_random = numpy_random_state.random() < 0.5
split_A = []
for sample_a in sample[:pivot]:
split_A.extend(sample_a)
split_B = []
for sample_b in sample[pivot:]:
split_B.extend(sample_b)
if is_next_random:
split_A, split_B = split_B, split_A
# Trim the subsegments from either end to a desired joint length
length_A = len(split_A)
length_B = len(split_B)
if length_A + length_B <= target_sequence_length:
truncated = False
else:
while length_A + length_B > target_sequence_length:
split = split_A if length_A > length_B else split_B
if numpy_random_state.random() < 0.5:
del split[0]
else:
del split[-1]
length_A = len(split_A)
length_B = len(split_B)
truncated = True
# Merge the subsegments and create the token assignment labels
tokens = [self.config.tokenizer.cls, *split_A, self.config.tokenizer.sep]
assignments = [0 for _ in range(1 + len(split_A) + 1)]
if split_B:
tokens += [*split_B, self.config.tokenizer.sep]
assignments += [1 for _ in range(len(split_B) + 1)]
# Masking
tokens, masked_positions, masked_labels, _, _ = self._create_masked_lm_predictions(
tokens, target_sequence_length, numpy_random_state
)
# Pad the sequences and convert to NumPy
length_toks = len(tokens)
length_pads = self.config.sequence_length - length_toks
assert length_pads >= 0
tokens = numpy.array(tokens, dtype=numpy.int64)
tokens = numpy.pad(tokens, (0, length_pads), constant_values=self.config.tokenizer.pad)
assignments = numpy.array(assignments, dtype=numpy.int64)
assignments = numpy.pad(
assignments, (0, length_pads), constant_values=self.config.tokenizer.pad
)
# Get the padding mask
mask_pads = numpy.ones(length_toks, dtype=numpy.int64)
mask_pads = numpy.pad(
mask_pads, (0, length_pads), constant_values=self.config.tokenizer.pad
)
# Mask the labels
labels = numpy.zeros(self.config.sequence_length, dtype=numpy.int64) - 1
labels[masked_positions] = masked_labels
# Get the loss mask
mask_loss = numpy.zeros(self.config.sequence_length, dtype=numpy.int64)
mask_loss[masked_positions] = 1
return {
"text": tokens,
"types": assignments,
"labels": labels,
"is_random": int(is_next_random),
"padding_mask": mask_pads,
"loss_mask": mask_loss,
"truncated": int(truncated),
}
def _get_token_mask(self, numpy_random_state: numpy.random.RandomState) -> Optional[int]:
"""Abstract method implementation
80% of the time, replace the token id with mask token id. 10% of the time, replace token id
with a random token id from the vocabulary. 10% of the time, do nothing.
Args:
numpy_random_state (RandomState): The NumPy random state
Returns:
Optional[int]: The replacement token id or None
"""
if numpy_random_state.random() < 0.8:
return self.config.tokenizer.mask
else:
if numpy_random_state.random() >= 0.5:
return self.token_lookup[numpy_random_state.randint(0, len(self.token_lookup))]
return None
# Copyright (c) 2022, NVIDIA CORPORATION. All rights reserved.
import hashlib
import json
import logging
import os
import time
from collections import OrderedDict
from typing import Dict, List, Optional, Tuple, Union
import numpy
import torch
from megatron.core.datasets.blended_megatron_dataset_config import BlendedMegatronDatasetConfig
from megatron.core.datasets.megatron_dataset import MegatronDataset
from megatron.core.datasets.utils import normalize
from megatron.core.utils import log_single_rank
logger = logging.getLogger(__name__)
_VERBOSE = False
class BlendedDataset(torch.utils.data.Dataset):
"""Conjugating class for a set of MegatronDataset instances
Args:
datasets (List[MegatronDataset]): The MegatronDataset instances to blend
weights (List[Union[int, float]]): The weights that determine the dataset blend ratios
size (Optional[int]): The number of samples to draw from the blend. If None, for each
dataset index idx draw exactly weights[idx] samples from datasets[idx].
config (BlendedMegatronDatasetConfig): The config
Raises:
RuntimeError: When the dataset has fewer or more samples than 'size' post-initialization
"""
def __init__(
self,
datasets: List[MegatronDataset],
weights: List[Union[int, float]],
size: Optional[int],
config: BlendedMegatronDatasetConfig,
) -> None:
assert len(datasets) == len(weights)
assert len(datasets) < 32767
assert all(map(lambda _: type(_) == type(datasets[0]), datasets))
assert all(map(lambda _: _.index_split == datasets[0].index_split, datasets))
assert all(map(lambda _: _ > 0, weights))
assert all(map(lambda _: type(_) == type(weights[0]), weights))
if size is None and isinstance(weights[0], float):
assert all(map(lambda _: _ == int(_), weights))
# Alert user to unnecessary blending
if len(datasets) == 1:
log_single_rank(
logger, logging.WARNING, f"Building a BlendedDataset for a single MegatronDataset"
)
if size is not None:
weights = normalize(weights)
self.datasets = datasets
self.split = self.datasets[0].index_split
self.weights = weights
self.size = size
self.config = config
unique_identifiers = OrderedDict()
unique_identifiers["class"] = type(self).__name__
unique_identifiers["datasets"] = [dataset.unique_identifiers for dataset in self.datasets]
unique_identifiers["split"] = self.split.name
unique_identifiers["weights"] = self.weights
unique_identifiers["size"] = self.size
self.unique_description = json.dumps(
unique_identifiers, indent=4, default=lambda obj: obj.unique_identifiers
)
self.unique_description_hash = hashlib.md5(
self.unique_description.encode("utf-8")
).hexdigest()
self.built_anew_on_cache_miss = False
self.dataset_index, self.dataset_sample_index = self._build_indices()
def __len__(self) -> int:
return self.dataset_index.shape[0]
def __getitem__(self, idx: int) -> Dict[str, Union[int, numpy.ndarray]]:
dataset_id = self.dataset_index[idx]
dataset_sample_id = self.dataset_sample_index[idx]
return {"dataset_id": dataset_id, **self.datasets[dataset_id][dataset_sample_id]}
def _build_indices(self) -> Tuple[numpy.ndarray, numpy.ndarray]:
"""Build and optionally cache the dataset index and the dataset sample index
The dataset index is a 1-D mapping which determines the dataset to query. The dataset
sample index is a 1-D mapping which determines the sample to request from the queried
dataset.
Returns:
Tuple[numpy.ndarray, numpy.ndarray]: The dataset index and the dataset sample index
"""
path_to_cache = self.config.path_to_cache
if path_to_cache:
get_path_to = lambda suffix: os.path.join(
path_to_cache,
f"{self.unique_description_hash}-{type(self).__name__}-{self.split.name}-{suffix}",
)
path_to_description = get_path_to("description.txt")
path_to_dataset_index = get_path_to("dataset_index.npy")
path_to_dataset_sample_index = get_path_to("dataset_sample_index.npy")
cache_hit = all(
map(
os.path.isfile,
[path_to_description, path_to_dataset_index, path_to_dataset_sample_index],
)
)
else:
cache_hit = False
if not path_to_cache or (not cache_hit and torch.distributed.get_rank() == 0):
log_single_rank(
logger, logging.INFO, f"Build and save the {type(self).__name__} indices"
)
self.built_anew_on_cache_miss = True
# Build the dataset and dataset sample indexes
log_single_rank(
logger, logging.INFO, f"\tBuild and save the dataset and dataset sample indexes"
)
t_beg = time.time()
from megatron.core.datasets import helpers
if self.size is not None:
dataset_index = numpy.zeros(self.size, dtype=numpy.int16)
dataset_sample_index = numpy.zeros(self.size, dtype=numpy.int64)
helpers.build_blending_indices(
dataset_index,
dataset_sample_index,
self.weights,
len(self.datasets),
self.size,
_VERBOSE,
)
else:
size = sum(self.weights)
dataset_index = numpy.zeros(size, dtype=numpy.int16)
dataset_sample_index = numpy.zeros(size, dtype=numpy.int64)
helpers.build_exhaustive_blending_indices(
dataset_index, dataset_sample_index, self.weights, len(self.datasets)
)
if path_to_cache:
os.makedirs(path_to_cache, exist_ok=True)
# Write the description
with open(path_to_description, "wt") as writer:
writer.write(self.unique_description)
# Save the indexes
numpy.save(path_to_dataset_index, dataset_index, allow_pickle=True)
numpy.save(path_to_dataset_sample_index, dataset_sample_index, allow_pickle=True)
else:
log_single_rank(
logger,
logging.WARNING,
f"Cannot save the {type(self).__name__} indexes because path_to_cache is None",
)
t_end = time.time()
log_single_rank(logger, logging.DEBUG, f"\t> time elapsed: {t_end - t_beg:4f} seconds")
return dataset_index, dataset_sample_index
log_single_rank(logger, logging.INFO, f"Load the {type(self).__name__} indices")
log_single_rank(
logger, logging.INFO, f"\tLoad the dataset index from {path_to_dataset_index}"
)
t_beg = time.time()
dataset_index = numpy.load(path_to_dataset_index, allow_pickle=True, mmap_mode='r')
t_end = time.time()
log_single_rank(logger, logging.DEBUG, f"\t> time elapsed: {t_end - t_beg:4f} seconds")
log_single_rank(
logger,
logging.INFO,
f"\tLoad the dataset sample index from {path_to_dataset_sample_index}",
)
t_beg = time.time()
dataset_sample_index = numpy.load(
path_to_dataset_sample_index, allow_pickle=True, mmap_mode='r'
)
t_end = time.time()
log_single_rank(logger, logging.DEBUG, f"\t> time elapsed: {t_end - t_beg:4f} seconds")
return dataset_index, dataset_sample_index
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment