# coding=utf-8 # Copyright (c) 2020, NVIDIA CORPORATION. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """Pretrain BERT""" import torch import torch.nn.functional as F from megatron import get_args from megatron import print_rank_0 from megatron import get_timers from megatron import mpu from megatron.data.dataset_utils import build_train_valid_test_datasets from megatron.model import (BertModel, BertModelFirstStage, BertModelIntermediateStage, BertModelLastStage) from megatron.training import pretrain from megatron.utils import average_losses_across_data_parallel_group def model_provider(): """Build the model.""" print_rank_0('building BERT model ...') args = get_args() num_tokentypes = 2 if args.bert_binary_head else 0 def model_provider_pipelined(): # Determine model based on position of stage in pipeline. if mpu.is_pipeline_first_stage(): model = BertModelFirstStage( num_tokentypes=num_tokentypes) elif mpu.is_pipeline_last_stage(): model = BertModelLastStage( num_tokentypes=num_tokentypes, add_binary_head=args.bert_binary_head, parallel_output=True) else: model = BertModelIntermediateStage( num_tokentypes=num_tokentypes) return model args = get_args() if mpu.get_pipeline_model_parallel_world_size() > 1: if args.virtual_pipeline_model_parallel_size is not None: model = [] for i in range(args.virtual_pipeline_model_parallel_size): mpu.set_virtual_pipeline_model_parallel_rank(i) model.append(model_provider_pipelined()) else: model = model_provider_pipelined() else: model = BertModel( num_tokentypes=num_tokentypes, add_binary_head=args.bert_binary_head, parallel_output=True) return model def get_batch(data_iterator): """Build the batch.""" # Items and their type. keys = ['text', 'types', 'labels', 'is_random', 'loss_mask', 'padding_mask'] datatype = torch.int64 # Broadcast data. if data_iterator is not None: data = next(data_iterator) else: data = None data_b = mpu.broadcast_data(keys, data, datatype) # Unpack. tokens = data_b['text'].long() types = data_b['types'].long() sentence_order = data_b['is_random'].long() loss_mask = data_b['loss_mask'].float() lm_labels = data_b['labels'].long() padding_mask = data_b['padding_mask'].long() return tokens, types, sentence_order, loss_mask, lm_labels, padding_mask def forward_step(data_iterator, model, input_tensor): """Forward step.""" args = get_args() timers = get_timers() # Get the batch. timers('batch-generator').start() tokens, types, sentence_order, loss_mask, lm_labels, padding_mask = get_batch( data_iterator) timers('batch-generator').stop() if not args.bert_binary_head: types = None # Forward pass through the model. if mpu.is_pipeline_first_stage(): assert input_tensor is None if mpu.is_pipeline_last_stage(): output_tensor = model(tokens, padding_mask, tokentype_ids=types, lm_labels=lm_labels) else: output_tensor = model(tokens, padding_mask, tokentype_ids=types) elif mpu.is_pipeline_last_stage(): assert input_tensor is not None output_tensor = model(input_tensor, padding_mask, lm_labels=lm_labels) else: assert input_tensor is not None output_tensor = model(input_tensor, padding_mask) if mpu.is_pipeline_last_stage(): lm_loss_, sop_logits = output_tensor lm_loss_ = lm_loss_.float() loss_mask = loss_mask.float() lm_loss = torch.sum( lm_loss_.view(-1) * loss_mask.reshape(-1)) / loss_mask.sum() if sop_logits is not None: sop_loss = F.cross_entropy(sop_logits.view(-1, 2).float(), sentence_order.view(-1), ignore_index=-1) sop_loss = sop_loss.float() loss = lm_loss + sop_loss averaged_losses = average_losses_across_data_parallel_group( [lm_loss, sop_loss]) return loss, {'lm loss': averaged_losses[0], 'sop loss': averaged_losses[1]} else: loss = lm_loss averaged_losses = average_losses_across_data_parallel_group( [lm_loss]) return loss, {'lm loss': averaged_losses[0]} return output_tensor def train_valid_test_datasets_provider(train_val_test_num_samples): """Build train, valid, and test datasets.""" args = get_args() print_rank_0('> building train, validation, and test datasets ' 'for BERT ...') train_ds, valid_ds, test_ds = build_train_valid_test_datasets( data_prefix=args.data_path, data_impl=args.data_impl, splits_string=args.split, train_valid_test_num_samples=train_val_test_num_samples, max_seq_length=args.seq_length, masked_lm_prob=args.mask_prob, short_seq_prob=args.short_seq_prob, seed=args.seed, skip_warmup=(not args.mmap_warmup), binary_head=args.bert_binary_head) print_rank_0("> finished creating BERT datasets ...") return train_ds, valid_ds, test_ds if __name__ == "__main__": pretrain(train_valid_test_datasets_provider, model_provider, forward_step, args_defaults={'tokenizer_type': 'BertWordPieceLowerCase'})