prepare_pile_data.py 6.3 KB
Newer Older
liangjing's avatar
update  
liangjing committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
import zstandard
import sys
import time
import os
import sys
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__),
                                             os.path.pardir,os.path.pardir)))
from megatron.data import indexed_dataset

def pile_download(download_url, file_path, i):
    start = time.time()
    zstd_file_path = f"{file_path}{i:02}.jsonl.zst"
    download_path = f"{download_url}{i:02}.jsonl.zst"
    if not os.path.exists(zstd_file_path):
        os.system(f"wget -P {file_path} {download_path}")
        print(f"Finished downloading chunk {i} in {time.time() - start} sec")

def pile_decompress(download_url, file_path, i):
    zstd_file_path = f"{file_path}{i:02}.jsonl.zst"
    output_path = f"{file_path}{i:02}.jsonl"
    if not os.path.exists(output_path):
        if not os.path.exists(zstd_file_path):
            pile_download(download_url, file_path, i)
        start = time.time()
        with open(zstd_file_path, 'rb') as compressed:
            decomp = zstandard.ZstdDecompressor()
            with open(output_path, 'wb') as destination:
                decomp.copy_stream(compressed, destination)
        os.remove(zstd_file_path)
        print(f"Finished decompressing chunk {i} in {time.time() - start} sec")

def pile_preprocess(download_url, file_path, vocab_file, num_workers, i):
    json_file_path = f"{file_path}{i:02}.jsonl"
    output_prefix = f"{file_path}pile_bert_train_{i:02}"
    if not os.path.exists(f"{output_prefix}_text_sentence.idx"):
        if not os.path.exists(json_file_path):
            pile_decompress(download_url, file_path, i)
        start = time.time()
        cmd = f"python ../../tools/preprocess_data.py \
                --input {json_file_path} \
                --output-prefix {output_prefix} \
                --vocab {vocab_file} \
                --dataset-impl mmap \
                --tokenizer-type BertWordPieceLowerCase \
                --split-sentences \
                --workers {num_workers} "
        # It's possible to hit MemoryError during above cmd since the memory
        # usage is proportional to num_workers. In this case we delete the
        # incomplete output and user shall retry with smaller num_workers.
        # Our experience show that chunk 6, 7, 9, 17, 18, 20, 21, 24, 27
        # particularly have large memory usage.
        if os.system(cmd) == 0: # Success
            os.remove(json_file_path)
        else:
            print(f"Error: chunk {i} preprocessing got error, delete \
                    incomplete output. If MemoryError appeared, please retry \
                    with num_workers smaller than {num_workers}.")
            if os.path.exists(f"{output_prefix}_text_sentence.idx"):
                os.remove(f"{output_prefix}_text_sentence.idx")
            if os.path.exists(f"{output_prefix}_text_sentence.bin"):
                os.remove(f"{output_prefix}_text_sentence.bin")
        print(f"Finished preprocessing chunk {i} in {time.time() - start} sec")

def pile_merge(file_path):
    start = time.time()
    num_chunks = 30
    vocab_size = 30524
    for i in range(num_chunks):
        output_prefix = f"{file_path}pile_bert_train_{i:02}"
        assert os.path.exists(f"{output_prefix}_text_sentence.idx")
        assert os.path.exists(f"{output_prefix}_text_sentence.bin")
    builder = indexed_dataset.make_builder(
        f"{file_path}pile_bert_train_text_sentence.bin", impl="mmap",
        vocab_size=vocab_size)
    for i in range(num_chunks):
        chunk_file = f"{file_path}pile_bert_train_{i:02}_text_sentence"
        print(f"Merging file {chunk_file}")
        builder.merge_file_(chunk_file)
    print("Finalizing merged file ...")
    builder.finalize(f"{file_path}pile_bert_train_text_sentence.idx")
    print(f"Finished merging in {time.time() - start} sec")
    # After verifying the merged data with real training, you may want to
    # delete the data chunks.
    # for i in range(num_chunks):
    #     output_prefix = f"{file_path}pile_bert_train_{i:02}"
    #     os.remove(f"{output_prefix}_text_sentence.idx")
    #     os.remove(f"{output_prefix}_text_sentence.bin")

if __name__ == '__main__':
    # Path to download and store all the output files during the whole process.
    # Estimated max storage usage would be around 1.6 TB (or 780GB if skip the
    # final merge). Memory usage is proportional to the num_workers below (can
    # be as high as O(300GB) if num_workers is around 20).
    file_path = "/blob/data/the_pile_bert/"
    # The raw Pile data has 30 compressed .zst chunks. To run on single
    # machine for all chunks, run "python prepare_pile_data.py range 0 30".
    # You can also split and run on multiple machines to speed up, since
    # processing one chunk can take hours. The whole process only uses CPU.
    if sys.argv[1] == "merge":
        # "python prepare_pile_data.py merge" means merge all 30 processed data
        # chunks. Run it only after all 30 chunks are preprocessed. The memory
        # usage during merge is about 600GB. If you don't have enough memory,
        # one solution is to directly use the 30 data chunks as multiple
        # datasets. See '--data-path' in
        # github.com/microsoft/Megatron-DeepSpeed/blob/main/megatron/arguments.py
        pile_merge(file_path)
    else:
        if sys.argv[1] == "range":
            # "python prepare_pile_data.py range 0 30" means process chunk 0-29
            selected_chunk = range(int(sys.argv[2]), int(sys.argv[3]))
        else:
            # "python prepare_pile_data.py 2 5 8" means process chunk 2, 5, 8
            selected_chunk = [int(x) for x in sys.argv[1:]]
        print("selected_chunk: ", selected_chunk)
        # Number of process. Adjust based on your CPU/Memory.
        num_workers = 20
        # Where the raw Pile data can be downloaded. The url may change in
        # future. Contact EleutherAI (https://github.com/EleutherAI/the-pile)
        # if this url does not work.
        download_url = "https://the-eye.eu/public/AI/pile/train/"
        vocab_file = "bert-large-uncased-vocab.txt"
        vocab_url = "https://s3.amazonaws.com/models.huggingface.co/bert/bert-large-uncased-vocab.txt"
        if not os.path.exists(vocab_file):
            os.system(f"wget {vocab_url}")
        os.makedirs(file_path, exist_ok=True)

        for i in selected_chunk:
            pile_preprocess(download_url, file_path, vocab_file, num_workers, i)