Commit 667c2bcb authored by Jared Casper's avatar Jared Casper
Browse files

Merge branch 'testing-preprocessing' into 'main'

Data preprocessing testing changes + fixes

See merge request ADLR/megatron-lm!416
parents b44dca25 a2c5e6cd
......@@ -295,14 +295,19 @@ class IndexedDatasetBuilder(object):
index = IndexedDataset(another_file)
assert index.dtype == self.dtype
doc_offset = len(self.sizes)
begin = self.data_offsets[-1]
for offset in index.data_offsets[1:]:
self.data_offsets.append(begin + offset)
for data_offset in index.data_offsets[1:]:
self.data_offsets.append(begin + data_offset)
self.sizes.extend(index.sizes)
begin = self.dim_offsets[-1]
for dim_offset in index.dim_offsets[1:]:
self.dim_offsets.append(begin + dim_offset)
self.doc_idx.extend((doc_offset + index.doc_idx)[1:])
with open(data_file_path(another_file), 'rb') as f:
while True:
data = f.read(1024)
......@@ -556,8 +561,9 @@ class MMapIndexedDatasetBuilder(object):
index = MMapIndexedDataset.Index(index_file_path(another_file))
assert index.dtype == self._dtype
for size in index.sizes:
self._sizes.append(size)
offset = len(self._sizes)
self._sizes.extend(index.sizes)
self._doc_idx.extend((offset + index.doc_idx)[1:])
# Concatenate data
with open(data_file_path(another_file), 'rb') as f:
......
import os
import sys
import json
import argparse
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__),
os.path.pardir)))
from megatron.data import indexed_dataset
def main(args):
prefixes = set()
for basename in os.listdir(args.input):
prefix, ext = os.path.splitext(basename)
if prefix in prefixes:
continue
if not os.path.isfile(os.path.join(args.input, basename)):
continue
ext_pair = '.bin' if ext == '.idx' else '.idx'
assert os.path.isfile(os.path.join(args.input, prefix) + ext_pair), \
f'ERROR: {ext_pair} file not provided for {os.path.join(args.input, prefix)}'
prefixes.add(prefix)
builder = None
for prefix in sorted(prefixes):
if builder is None:
dataset = indexed_dataset.make_dataset(os.path.join(args.input, prefix), 'infer')
if isinstance(dataset, indexed_dataset.MMapIndexedDataset):
builder = indexed_dataset.MMapIndexedDatasetBuilder(args.output_prefix + '.bin', dtype=dataset._index.dtype)
else:
builder = indexed_dataset.IndexedDatasetBuilder(args.output_prefix + '.bin')
del dataset
builder.merge_file_(os.path.join(args.input, prefix))
builder.finalize(args.output_prefix + '.idx')
if __name__ == '__main__':
parser = argparse.ArgumentParser()
group = parser.add_argument_group(title='input data')
group.add_argument('--input', type=str, required=True,
help='Path to directory containing all document files to merge')
group = parser.add_argument_group(title='output data')
group.add_argument('--output-prefix', type=str, required=True,
help='Path to binary output file without suffix')
args = parser.parse_args()
assert os.path.isdir(args.input), \
f'ERROR: {args.input} is not a directory or does not exist'
assert os.path.isdir(os.path.dirname(args.output_prefix)), \
f'ERROR: {os.path.dirname(args.output_prefix)} is not a directory or does not exist'
main(args)
......@@ -122,8 +122,10 @@ def get_args():
choices=['lazy', 'cached', 'mmap'])
group = parser.add_argument_group(title='runtime')
group.add_argument('--workers', type=int, default=1,
group.add_argument('--workers', type=int, required=True,
help='Number of worker processes to launch')
group.add_argument('--chunk-size', type=int, required=True,
help='Chunk size assigned to each worker process')
group.add_argument('--log-interval', type=int, default=100,
help='Interval between progress updates')
args = parser.parse_args()
......@@ -154,7 +156,7 @@ def main():
encoder = Encoder(args)
tokenizer = build_tokenizer(args)
pool = multiprocessing.Pool(args.workers, initializer=encoder.initializer)
encoded_docs = pool.imap(encoder.encode, fin, 25)
encoded_docs = pool.imap(encoder.encode, fin, args.chunk_size)
#encoded_docs = map(encoder.encode, fin)
level = "document"
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment