# Copyright (c) 2022, NVIDIA CORPORATION. All rights reserved. import json import os import sys import tempfile import nltk import pytest import requests from megatron.core.datasets.indexed_dataset import IndexedDataset from megatron.training.tokenizer.gpt2_tokenization import ( PRETRAINED_MERGES_ARCHIVE_MAP, PRETRAINED_VOCAB_ARCHIVE_MAP, ) from tools.merge_datasets import main as merge_main from tools.preprocess_data import Encoder from tools.preprocess_data import get_args as build_args from tools.preprocess_data import main as build_main __HUGGINGFACE_BERT_BASE_UNCASED_VOCAB = ( "https://huggingface.co/bert-base-uncased/raw/main/vocab.txt" ) __LOCAL_BERT_VOCAB = "/home/gitlab-runner/data/bert_data/vocab.txt" __LOCAL_GPT2_MERGE = "/home/gitlab-runner/data/gpt3_data/gpt2-merges.txt" __LOCAL_GPT2_VOCAB = "/home/gitlab-runner/data/gpt3_data/gpt2-vocab.json" def dummy_jsonl(odir): # numbers list_numbers = [json.dumps({"text": str(i + 1)}) + "\n" for i in range(100)] with open(os.path.join(odir, "numbers.jsonl"), "w") as writer: writer.writelines(list_numbers) # numbers ascending list_numbers_ascending = [ json.dumps({"text": " ".join([str(j + 1) for j in range(i + 1)])}) + "\n" for i in range(100) ] with open(os.path.join(odir, "numbers_ascending.jsonl"), "w") as writer: writer.writelines(list_numbers_ascending) # test list_test = [] with open(__file__) as reader: for line in reader: list_test.append(json.dumps({"text": line}) + "\n") with open(os.path.join(odir, "test.jsonl"), "w") as writer: writer.writelines(list_test) def build_datasets(idir, odir, extra_args=[]): for name in os.listdir(idir): sys.argv = [ sys.argv[0], "--input", os.path.join(idir, name), "--output-prefix", os.path.join(odir, os.path.splitext(name)[0]), ] + extra_args build_main() def merge_datasets(idir): sys.argv = [sys.argv[0], "--input", idir, "--output-prefix", os.path.join(idir, "merge")] merge_main() def do_test_preprocess_data(temp_dir, extra_args=[]): # set the default nltk data path os.environ["NLTK_DATA"] = os.path.join(temp_dir, "nltk_data") nltk.data.path.append(os.environ["NLTK_DATA"]) path_to_raws = os.path.join(temp_dir, "sample_raws") path_to_data = os.path.join(temp_dir, "sample_data") os.mkdir(path_to_raws) os.mkdir(path_to_data) # create the dummy resources dummy_jsonl(path_to_raws) # build the datasets build_datasets(path_to_raws, path_to_data, extra_args=extra_args) # merge the datasets merge_datasets(path_to_data) sys.argv = [sys.argv[0], "--input", None, "--output-prefix", None] + extra_args encoder = Encoder(build_args()) encoder.initializer() def tokens_to_string(toks): for option in ["decode", "detokenize"]: try: return getattr(encoder.tokenizer, option)(toks) except: continue raise RuntimeError(f"{type(encoder.tokenizer)} tokenizer cannot decode or detokenize") merged_index = 0 merged_dataset = IndexedDataset(os.path.join(path_to_data, "merge")) # sorted to ensure ordering matches merged dataset basenames = sorted( [ name for name in os.listdir(path_to_data) if name.endswith(".idx") and not name.startswith("merge") ] ) # index into the merged document index merged_doc_index_index = 0 for basename in basenames: realpath_raw = f"{os.path.join(path_to_raws, '_'.join(basename.split('_')[:-2]))}.jsonl" realpath_doc = os.path.join(path_to_data, basename.split(".")[-2]) dataset_index = 0 dataset = IndexedDataset(realpath_doc) merged_doc_idx = merged_dataset.document_indices[ merged_doc_index_index : merged_doc_index_index + len(dataset.document_indices) ] merged_doc_idx = merged_doc_idx - merged_doc_idx[0] assert ( dataset.document_indices == merged_doc_idx ).all(), f"ERROR: {basename.split('_')[:-2]}: merged dataset document indices mismatch" merged_doc_index_index += len(dataset.document_indices) - 1 with open(realpath_raw, "rt") as reader: for json_line in reader: toks = encoder.encode(json_line)[0]["text"] raw = tokens_to_string(toks) processed_toks = [] while len(processed_toks) < len(toks): processed_toks.extend(dataset[dataset_index]) dataset_index += 1 processed = tokens_to_string(processed_toks) assert ( raw == processed ), f"ERROR: {basename.split('_')[:-2]}: raw and processed documents do not match" merged_toks = [] while len(merged_toks) < len(toks): merged_toks.extend(merged_dataset[merged_index]) merged_index += 1 merged = tokens_to_string(merged_toks) assert ( raw == merged ), f"ERROR: {basename.split('_')[:-2]}: raw and merged documents do not match" print( f"INFO: {''.join(basename.split('_')[:-2])}: raw, processed, and merged documents match!" ) print("INFO: Success!") def gpt2_vocab(odir): if os.path.exists(__LOCAL_GPT2_VOCAB): return __LOCAL_GPT2_VOCAB path = os.path.join(odir, "vocab.json") with open(path, "wb") as writer: writer.write(requests.get(PRETRAINED_VOCAB_ARCHIVE_MAP['gpt2']).content) return path def gpt2_merge(odir): if os.path.exists(__LOCAL_GPT2_MERGE): return __LOCAL_GPT2_MERGE path = os.path.join(odir, "merge.txt") with open(path, "wb") as writer: writer.write(requests.get(PRETRAINED_MERGES_ARCHIVE_MAP['gpt2']).content) return path def test_preprocess_data_gpt(): with tempfile.TemporaryDirectory() as temp_dir: # gpt specific args gpt_args = [ "--tokenizer-type", "GPT2BPETokenizer", "--vocab-file", gpt2_vocab(temp_dir), "--merge-file", gpt2_merge(temp_dir), "--append-eod", "--workers", "10", "--log-interval", "1", ] do_test_preprocess_data(temp_dir, extra_args=gpt_args) def bert_vocab(odir): if os.path.exists(__LOCAL_BERT_VOCAB): return __LOCAL_BERT_VOCAB path = os.path.join(odir, "vocab.txt") with open(path, "wb") as writer: writer.write(requests.get(__HUGGINGFACE_BERT_BASE_UNCASED_VOCAB).content) return path @pytest.mark.flaky @pytest.mark.flaky_in_dev def test_preprocess_data_bert(): with tempfile.TemporaryDirectory() as temp_dir: # bert specific args bert_args = [ "--tokenizer-type", "BertWordPieceLowerCase", "--vocab-file", bert_vocab(temp_dir), "--split-sentences", "--workers", "10", "--log-interval", "1", "--partitions", "2", "--keep-sequential-samples", ] do_test_preprocess_data(temp_dir, extra_args=bert_args) if __name__ == "__main__": test_preprocess_data_gpt() test_preprocess_data_bert()