Commit 8ba76558 authored by Neel Kant's avatar Neel Kant
Browse files

Merge block-dataset into ict-stable

parents 1a3f5663 3dc39c4a
...@@ -42,6 +42,11 @@ def build_train_valid_test_datasets(data_prefix, data_impl, splits_string, ...@@ -42,6 +42,11 @@ def build_train_valid_test_datasets(data_prefix, data_impl, splits_string,
data_impl, data_impl,
skip_warmup) skip_warmup)
if ict_dataset:
titles_dataset = get_indexed_dataset_(data_prefix + '-titles',
data_impl,
skip_warmup)
# Get start and end indices of train/valid/train into doc-idx # Get start and end indices of train/valid/train into doc-idx
# Note that doc-idx is desinged to be num-docs + 1 so we can # Note that doc-idx is desinged to be num-docs + 1 so we can
# easily iterate over it. # easily iterate over it.
...@@ -78,7 +83,7 @@ def build_train_valid_test_datasets(data_prefix, data_impl, splits_string, ...@@ -78,7 +83,7 @@ def build_train_valid_test_datasets(data_prefix, data_impl, splits_string,
# Build the dataset accordingly. # Build the dataset accordingly.
kwargs = dict( kwargs = dict(
name=name, name=name,
indexed_dataset=indexed_dataset, context_dataset=indexed_dataset,
data_prefix=data_prefix, data_prefix=data_prefix,
num_epochs=None, num_epochs=None,
max_num_samples=train_valid_test_num_samples[index], max_num_samples=train_valid_test_num_samples[index],
...@@ -88,7 +93,7 @@ def build_train_valid_test_datasets(data_prefix, data_impl, splits_string, ...@@ -88,7 +93,7 @@ def build_train_valid_test_datasets(data_prefix, data_impl, splits_string,
) )
if ict_dataset: if ict_dataset:
dataset = InverseClozeDataset(**kwargs) dataset = InverseClozeDataset(titles_dataset=titles_dataset, **kwargs)
else: else:
dataset = BertDataset(masked_lm_prob=masked_lm_prob, **kwargs) dataset = BertDataset(masked_lm_prob=masked_lm_prob, **kwargs)
# Set the original pointer so dataset remains the main dataset. # Set the original pointer so dataset remains the main dataset.
......
...@@ -304,7 +304,211 @@ py::array build_mapping(const py::array_t<int64_t>& docs_, ...@@ -304,7 +304,211 @@ py::array build_mapping(const py::array_t<int64_t>& docs_,
} }
} }
template<typename DocIdx>
py::array build_blocks_mapping_impl(const py::array_t<int64_t>& docs_,
const py::array_t<int32_t>& sizes_,
const py::array_t<int32_t>& titles_sizes_,
const int32_t num_epochs,
const uint64_t max_num_samples,
const int32_t max_seq_length,
const int32_t seed,
const bool verbose) {
/* Build a mapping of (start-index, end-index, sequence-length) where
start and end index are the indices of the sentences in the sample
and sequence-length is the target sequence length.
*/
// Consistency checks.
assert(num_epochs > 0);
assert(max_seq_length > 1);
assert(seed > 0);
// Remove bound checks.
auto docs = docs_.unchecked<1>();
auto sizes = sizes_.unchecked<1>();
auto titles_sizes = titles_sizes_.unchecked<1>();
if (verbose) {
const auto sent_start_index = docs[0];
const auto sent_end_index = docs[docs_.shape(0) - 1];
const auto num_sentences = sent_end_index - sent_start_index;
cout << " using:" << endl << std::flush;
cout << " number of documents: " << docs_.shape(0) - 1 <<
endl << std::flush;
cout << " sentences range: [" << sent_start_index <<
", " << sent_end_index << ")" << endl << std::flush;
cout << " total number of sentences: " << num_sentences <<
endl << std::flush;
cout << " number of epochs: " << num_epochs <<
endl << std::flush;
cout << " maximum number of samples: " << max_num_samples <<
endl << std::flush;
cout << " maximum sequence length: " << max_seq_length <<
endl << std::flush;
cout << " seed: " << seed << endl <<
std::flush;
}
// Mapping and its length (1D).
int64_t num_samples = -1;
DocIdx* maps = NULL;
// Perform two iterations, in the first iteration get the size
// and allocate memory and in the second iteration populate the map.
bool second = false;
for (int32_t iteration=0; iteration<2; ++iteration) {
// Set the flag on second iteration.
second = (iteration == 1);
// Current map index.
uint64_t map_index = 0;
// For each epoch:
for (int32_t epoch=0; epoch<num_epochs; ++epoch) {
if (map_index >= max_num_samples) {
if (verbose && (!second)) {
cout << " reached " << max_num_samples << " samples after "
<< epoch << " epochs ..." << endl << std::flush;
}
break;
}
// For each document:
for (int32_t doc=0; doc<(docs.shape(0) - 1); ++doc) {
// Document sentences are in [sent_index_first, sent_index_last)
const auto sent_index_first = docs[doc];
const auto sent_index_last = docs[doc + 1];
const auto target_seq_len = max_seq_length - titles_sizes[doc];
// At the begining of the document previous index is the
// start index.
auto prev_start_index = sent_index_first;
// Remaining documents.
auto num_remain_sent = sent_index_last - sent_index_first;
// Detect documents with long sentences.
bool contains_long_sentence = false;
if (num_remain_sent > 1) {
for (auto sent_index=sent_index_first;
sent_index < sent_index_last; ++sent_index) {
if (sizes[sent_index] > LONG_SENTENCE_LEN){
contains_long_sentence = true;
break;
}
}
}
// If we have more than two sentences.
if ((num_remain_sent > 1) && (!contains_long_sentence)) {
// Set values.
auto seq_len = int32_t{0};
auto num_sent = int32_t{0};
// Loop through sentences.
for (auto sent_index=sent_index_first;
sent_index < sent_index_last; ++sent_index) {
// Add the size and number of sentences.
seq_len += sizes[sent_index];
++num_sent;
--num_remain_sent;
// If we have reached the target length.
// and if not only one sentence is left in the document.
// and if we have at least two sentneces.
// or if we have reached end of the document.
if (((seq_len >= target_seq_len) &&
(num_remain_sent > 1) &&
(num_sent > 1) ) || (num_remain_sent == 0)) {
// Populate the map.
if (second) {
const auto map_index_0 = 3 * map_index;
maps[map_index_0] = static_cast<DocIdx>(prev_start_index);
maps[map_index_0 + 1] = static_cast<DocIdx>(sent_index + 1);
maps[map_index_0 + 2] = static_cast<DocIdx>(doc);
}
// Update indices / counters.
++map_index;
prev_start_index = sent_index + 1;
seq_len = 0;
num_sent = 0;
}
} // for (auto sent_index=sent_index_first; ...
} // if (num_remain_sent > 1) {
} // for (int doc=0; doc < num_docs; ++doc) {
} // for (int epoch=0; epoch < num_epochs; ++epoch) {
if (!second) {
if (verbose) {
cout << " will create mapping for " << map_index <<
" samples" << endl << std::flush;
}
assert(maps == NULL);
assert(num_samples < 0);
maps = new DocIdx[3*map_index];
num_samples = static_cast<int64_t>(map_index);
}
} // for (int iteration=0; iteration < 2; ++iteration) {
// Shuffle.
// We need a 64 bit random number generator as we might have more
// than 2 billion samples.
std::mt19937_64 rand64_gen(seed + 1);
for (auto i=(num_samples - 1); i > 0; --i) {
const auto j = static_cast<int64_t>(rand64_gen() % (i + 1));
const auto i0 = 3 * i;
const auto j0 = 3 * j;
// Swap values.
swap(maps[i0], maps[j0]);
swap(maps[i0 + 1], maps[j0 + 1]);
swap(maps[i0 + 2], maps[j0 + 2]);
}
// Method to deallocate memory.
py::capsule free_when_done(maps, [](void *mem_) {
DocIdx *mem = reinterpret_cast<DocIdx*>(mem_);
delete[] mem;
});
// Return the numpy array.
const auto byte_size = sizeof(DocIdx);
return py::array(std::vector<int64_t>{num_samples, 3}, // shape
{3*byte_size, byte_size}, // C-style contiguous strides
maps, // the data pointer
free_when_done); // numpy array references
}
py::array build_blocks_mapping(const py::array_t<int64_t>& docs_,
const py::array_t<int>& sizes_,
const py::array_t<int>& titles_sizes_,
const int num_epochs,
const uint64_t max_num_samples,
const int max_seq_length,
const int seed,
const bool verbose) {
if (sizes_.size() > std::numeric_limits<uint32_t>::max()) {
if (verbose) {
cout << " using uint64 for data mapping..." << endl << std::flush;
}
return build_blocks_mapping_impl<uint64_t>(docs_, sizes_, titles_sizes_,
num_epochs, max_num_samples, max_seq_length, seed, verbose);
} else {
if (verbose) {
cout << " using uint32 for data mapping..." << endl << std::flush;
}
return build_blocks_mapping_impl<uint32_t>(docs_, sizes_, titles_sizes_,
num_epochs, max_num_samples, max_seq_length, seed, verbose);
}
}
PYBIND11_MODULE(helpers, m) { PYBIND11_MODULE(helpers, m) {
m.def("build_mapping", &build_mapping); m.def("build_mapping", &build_mapping);
m.def("build_blocks_mapping", &build_blocks_mapping);
} }
import itertools
import random import random
import os import os
import sys
import time import time
import numpy as np import numpy as np
...@@ -11,17 +13,28 @@ from megatron import print_rank_0 ...@@ -11,17 +13,28 @@ from megatron import print_rank_0
from megatron import mpu from megatron import mpu
from megatron.data import helpers from megatron.data import helpers
class InverseClozeDataset(Dataset): class InverseClozeDataset(Dataset):
"""Dataset containing sentences and various 'blocks' for an inverse cloze task.""" """Dataset containing sentences and their blocks for an inverse cloze task."""
def __init__(self, name, indexed_dataset, data_prefix, def __init__(self, name, context_dataset, titles_dataset, data_prefix,
num_epochs, max_num_samples, max_seq_length, num_epochs, max_num_samples, max_seq_length,
short_seq_prob, seed): short_seq_prob, seed):
self.name = name self.name = name
self.seed = seed self.seed = seed
self.max_seq_length = max_seq_length self.max_seq_length = max_seq_length
self.indexed_dataset = indexed_dataset self.context_dataset = context_dataset
self.titles_dataset = titles_dataset
self.short_seq_prob = short_seq_prob self.short_seq_prob = short_seq_prob
self.rng = random.Random(self.seed)
self.samples_mapping = get_samples_mapping(self.context_dataset,
self.titles_dataset,
data_prefix,
num_epochs,
max_num_samples,
self.max_seq_length,
self.seed,
self.name)
tokenizer = get_tokenizer() tokenizer = get_tokenizer()
self.vocab_id_list = list(tokenizer.inv_vocab.keys()) self.vocab_id_list = list(tokenizer.inv_vocab.keys())
self.vocab_id_to_token_list = tokenizer.inv_vocab self.vocab_id_to_token_list = tokenizer.inv_vocab
...@@ -29,23 +42,35 @@ class InverseClozeDataset(Dataset): ...@@ -29,23 +42,35 @@ class InverseClozeDataset(Dataset):
self.sep_id = tokenizer.sep self.sep_id = tokenizer.sep
self.mask_id = tokenizer.mask self.mask_id = tokenizer.mask
self.pad_id = tokenizer.pad self.pad_id = tokenizer.pad
self.offset = 0
def __len__(self): def __len__(self):
return self.indexed_dataset.doc_idx.shape[0] return self.samples_mapping.shape[0]
def __getitem__(self, idx): def __getitem__(self, idx):
# get rng state corresponding to index (allows deterministic random pair) start_idx, end_idx, doc_idx = self.samples_mapping[idx]
rng = random.Random(idx + 20000 + self.seed) title = list(self.titles_dataset[int(doc_idx)])
context = [list(self.context_dataset[i]) for i in range(start_idx, end_idx)]
assert len(context) > 1
# avoid selecting the first or last sentence to be the query.
if len(context) == 2:
rand_sent_idx = int(self.rng.random() > 0.5)
else:
rand_sent_idx = self.rng.randint(1, len(context) - 2)
# get seq length. Save 2 tokens for beginning and end # keep the query in the context 10% of the time.
target_seq_length = self.max_seq_length - 2 if self.rng.random() < 0.1:
if rng.random() < self.short_seq_prob: input = context[rand_sent_idx].copy()
target_seq_length = rng.randint(5, target_seq_length) else:
input = context.pop(rand_sent_idx)
# may still need to truncate because blocks are concluded when
# the sentence lengths have exceeded max_seq_length.
input = input[:self.max_seq_length - 2]
context = list(itertools.chain(*context))[:self.max_seq_length - (3 + len(title))]
input_data, context_data = self.get_input_and_context(idx, target_seq_length, rng) input_tokens, input_token_types, input_pad_mask = self.concat_and_pad_tokens(input)
input_tokens, input_token_types, input_pad_mask = input_data context_tokens, context_token_types, context_pad_mask = self.concat_and_pad_tokens(context, title)
context_tokens, context_token_types, context_pad_mask = context_data
sample = { sample = {
'input_text': np.array(input_tokens), 'input_text': np.array(input_tokens),
...@@ -58,20 +83,12 @@ class InverseClozeDataset(Dataset): ...@@ -58,20 +83,12 @@ class InverseClozeDataset(Dataset):
return sample return sample
def get_sentence_split_doc(self, idx): def concat_and_pad_tokens(self, tokens, title=None):
"""fetch document at index idx and split into sentences"""
doc_start = self.indexed_dataset.doc_idx[idx]
doc_end = self.indexed_dataset.doc_idx[idx + 1]
doc_sentences_array = self.indexed_dataset[doc_start:doc_end]
doc_sentences = [list(arr) for arr in doc_sentences_array]
return doc_sentences
def concat_and_pad_tokens(self, tokens):
"""concat with special tokens and pad sequence to self.max_seq_length""" """concat with special tokens and pad sequence to self.max_seq_length"""
tokens = [self.cls_id] + tokens + [self.sep_id] tokens = [self.cls_id] + tokens + [self.sep_id]
assert len(tokens) <= self.max_seq_length if title is not None:
tokens += title + [self.sep_id]
assert len(tokens) <= self.max_seq_length, len(tokens)
num_pad = self.max_seq_length - len(tokens) num_pad = self.max_seq_length - len(tokens)
pad_mask = [0] * len(tokens) + [1] * num_pad pad_mask = [0] * len(tokens) + [1] * num_pad
...@@ -79,65 +96,82 @@ class InverseClozeDataset(Dataset): ...@@ -79,65 +96,82 @@ class InverseClozeDataset(Dataset):
token_types = [0] * self.max_seq_length token_types = [0] * self.max_seq_length
return tokens, token_types, pad_mask return tokens, token_types, pad_mask
def get_input_and_context(self, idx, target_seq_length, rng):
"""fetches a sentence and its surrounding context""" def get_samples_mapping(context_dataset,
num_tries = 0 titles_dataset,
while num_tries < 20: data_prefix,
num_tries += 1 num_epochs,
doc = None max_num_samples,
while doc is None: max_seq_length,
doc = self.get_sentence_split_doc(idx + self.offset) seed,
if not doc: name):
doc = None if not num_epochs:
self.offset += 1 if not max_num_samples:
raise ValueError("Need to specify either max_num_samples "
num_sentences = len(doc) "or num_epochs")
padless_max_len = self.max_seq_length - 2 num_epochs = np.iinfo(np.int32).max - 1
if not max_num_samples:
# select a random sentence from the document as input max_num_samples = np.iinfo(np.int64).max - 1
# TODO: consider adding multiple input sentences.
input_sentence_idx = rng.randint(0, num_sentences - 1) # Filename of the index mapping
input_tokens = doc[input_sentence_idx][:target_seq_length] indexmap_filename = data_prefix
if not len(input_tokens) > 0: indexmap_filename += '_{}_indexmap'.format(name)
self.offset += 1 if num_epochs != (np.iinfo(np.int32).max - 1):
continue indexmap_filename += '_{}ep'.format(num_epochs)
if max_num_samples != (np.iinfo(np.int64).max - 1):
context_tokens = [] indexmap_filename += '_{}mns'.format(max_num_samples)
# 10% of the time, the input sentence is left in the context. indexmap_filename += '_{}msl'.format(max_seq_length)
# The other 90% of the time, keep it out. indexmap_filename += '_{}s'.format(seed)
if rng.random() < 0.1: indexmap_filename += '.npy'
context_tokens = input_tokens.copy()
# Build the indexed mapping if not exist.
view_preceding = True if torch.distributed.get_rank() == 0 and \
view_radius = 1 not os.path.isfile(indexmap_filename):
while len(context_tokens) < padless_max_len: print(' > WARNING: could not find index map file {}, building '
# keep adding sentences while the context can accommodate more. 'the indices on rank 0 ...'.format(indexmap_filename))
if view_preceding:
examine_idx = input_sentence_idx - view_radius # Make sure the types match the helpers input types.
if examine_idx >= 0: assert context_dataset.doc_idx.dtype == np.int64
new_tokens = doc[examine_idx] assert context_dataset.sizes.dtype == np.int32
context_tokens = new_tokens + context_tokens
else: # Build samples mapping
examine_idx = input_sentence_idx + view_radius verbose = torch.distributed.get_rank() == 0
if examine_idx < num_sentences: start_time = time.time()
new_tokens = doc[examine_idx] print_rank_0(' > building samples index mapping for {} ...'.format(
context_tokens += new_tokens name))
view_radius += 1 samples_mapping = helpers.build_blocks_mapping(
view_preceding = not view_preceding context_dataset.doc_idx,
if view_radius > num_sentences: context_dataset.sizes,
break titles_dataset.sizes,
num_epochs,
# assemble the tokens and token types of the context max_num_samples,
context_tokens = context_tokens[:padless_max_len] max_seq_length-3, # account for added tokens
if not len(context_tokens) > 0: seed,
self.offset += 1 verbose)
continue print_rank_0(' > done building samples index mapping')
np.save(indexmap_filename, samples_mapping, allow_pickle=True)
# concatenate 'CLS' and 'SEP' tokens and add extra token types print_rank_0(' > saved the index mapping in {}'.format(
input_tokens, input_token_types, input_pad_mask = self.concat_and_pad_tokens(input_tokens) indexmap_filename))
context_tokens, context_token_types, context_pad_mask = self.concat_and_pad_tokens(context_tokens) # Make sure all the ranks have built the mapping
print_rank_0(' > elapsed time to build and save samples mapping '
return (input_tokens, input_token_types, input_pad_mask), \ '(seconds): {:4f}'.format(
(context_tokens, context_token_types, context_pad_mask) time.time() - start_time))
else: # This should be a barrier but nccl barrier assumes
raise RuntimeError("Could not get a valid data point from InverseClozeDataset") # device_index=rank which is not the case for model
# parallel case
counts = torch.cuda.LongTensor([1])
torch.distributed.all_reduce(counts, group=mpu.get_data_parallel_group())
assert counts[0].item() == torch.distributed.get_world_size(
group=mpu.get_data_parallel_group())
# Load indexed dataset.
print_rank_0(' > loading indexed mapping from {}'.format(
indexmap_filename))
start_time = time.time()
samples_mapping = np.load(indexmap_filename, allow_pickle=True)
print_rank_0(' loaded indexed file in {:3.3f} seconds'.format(
time.time() - start_time))
print_rank_0(' total number of samples: {}'.format(
samples_mapping.shape[0]))
return samples_mapping
import argparse import argparse
import itertools
import json import json
import multiprocessing import multiprocessing
import nltk import nltk
...@@ -43,18 +44,28 @@ class Encoder(object): ...@@ -43,18 +44,28 @@ class Encoder(object):
def encode(self, json_line): def encode(self, json_line):
text = json.loads(json_line)[self.args.json_key] text = json.loads(json_line)[self.args.json_key]
if not text:
text = "no text"
doc_ids = [] doc_ids = []
for sentence in Encoder.splitter.tokenize(text): for sentence in Encoder.splitter.tokenize(text):
tokens = Encoder.tokenizer.tokenize(sentence) tokens = Encoder.tokenizer.tokenize(sentence)
ids = Encoder.tokenizer.convert_tokens_to_ids(tokens) ids = Encoder.tokenizer.convert_tokens_to_ids(tokens)
if len(ids) > 0: if len(ids) > 0:
doc_ids.append(ids) doc_ids.append(ids)
else:
print("no ids!", flush=True)
tokens = Encoder.tokenizer.tokenize("no text")
ids = Encoder.tokenizer.convert_tokens_to_ids(tokens)
doc_ids.append(ids)
if self.args.flatten and len(doc_ids) > 1:
doc_ids = [list(itertools.chain(*doc_ids))]
return doc_ids, len(json_line) return doc_ids, len(json_line)
def main(): def main():
parser = argparse.ArgumentParser() parser = argparse.ArgumentParser()
parser.add_argument('--input', type=str, help='Path to input JSON') parser.add_argument('--input', type=str, help='Path to input JSON')
parser.add_argument('--vocab', type=str, help='Path to vocab.txt') parser.add_argument('--vocab', type=str, help='Path to vocab.txt')
parser.add_argument('--flatten', action='store_true', help='Path to input JSON')
parser.add_argument('--json-key', type=str, default='text', parser.add_argument('--json-key', type=str, default='text',
help='Key to extract from json') help='Key to extract from json')
parser.add_argument('--output-prefix', type=str, help='Path to binary output file without suffix') parser.add_argument('--output-prefix', type=str, help='Path to binary output file without suffix')
......
...@@ -24,7 +24,7 @@ from megatron import get_adlr_autoresume ...@@ -24,7 +24,7 @@ from megatron import get_adlr_autoresume
from megatron import mpu from megatron import mpu
from megatron import print_rank_0 from megatron import print_rank_0
from megatron.checkpointing import save_checkpoint from megatron.checkpointing import save_checkpoint
from megatron.data.samplers import DistributedBatchSampler, RandomSampler from megatron.data.samplers import DistributedBatchSampler
from megatron.fp16 import FP16_Optimizer from megatron.fp16 import FP16_Optimizer
...@@ -102,16 +102,12 @@ def make_data_loader(dataset): ...@@ -102,16 +102,12 @@ def make_data_loader(dataset):
num_workers = args.num_workers num_workers = args.num_workers
# Use a simple sampler with distributed batch sampler. # Use a simple sampler with distributed batch sampler.
#sampler = torch.utils.data.SequentialSampler(dataset) sampler = torch.utils.data.SequentialSampler(dataset)
sampler = RandomSampler(dataset,
replacement=True,
num_samples=global_batch_size*args.train_iters)
batch_sampler = DistributedBatchSampler(sampler=sampler, batch_sampler = DistributedBatchSampler(sampler=sampler,
batch_size=global_batch_size, batch_size=global_batch_size,
drop_last=True, drop_last=True,
rank=rank, rank=rank,
world_size=world_size, world_size=world_size)
wrap_last=True)
# Torch dataloader. # Torch dataloader.
return torch.utils.data.DataLoader(dataset, return torch.utils.data.DataLoader(dataset,
batch_sampler=batch_sampler, batch_sampler=batch_sampler,
......
...@@ -102,7 +102,7 @@ def get_train_val_test_data(): ...@@ -102,7 +102,7 @@ def get_train_val_test_data():
"""Load the data on rank zero and boradcast number of tokens to all GPUS.""" """Load the data on rank zero and boradcast number of tokens to all GPUS."""
args = get_args() args = get_args()
(train_data, val_data, test_data) = (None, None, None) (train_data, valid_data, test_data) = (None, None, None)
# Data loader only on rank 0 of each model parallel group. # Data loader only on rank 0 of each model parallel group.
if mpu.get_model_parallel_rank() == 0: if mpu.get_model_parallel_rank() == 0:
...@@ -115,7 +115,7 @@ def get_train_val_test_data(): ...@@ -115,7 +115,7 @@ def get_train_val_test_data():
# Number of train/valid/test samples. # Number of train/valid/test samples.
train_iters = args.train_iters train_iters = args.train_iters
eval_iters = args.eval_iters eval_iters = (train_iters // args.eval_iters + 1) * args.eval_iters
test_iters = args.eval_iters test_iters = args.eval_iters
train_val_test_num_samples = [train_iters * global_batch_size, train_val_test_num_samples = [train_iters * global_batch_size,
eval_iters * global_batch_size, eval_iters * global_batch_size,
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment