"sgl-router/src/routers/grpc/error.rs" did not exist on "d31d48b341dfa254814c75a6d6df2d68e4bc895b"
Commit 5394b117 authored by sunxx1's avatar sunxx1
Browse files

Merge branch 'deepspeed-branch' into 'main'

Deepspeed branch

See merge request dcutoolkit/deeplearing/dlexamples_new!22
parents 491af051 316d3f90
# coding=utf-8
# Copyright (c) 2020, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""GLUE finetuning/evaluation."""
from megatron import get_args
from megatron import print_rank_0
from megatron import get_tokenizer
from megatron.model.classification import Classification
from tasks.eval_utils import accuracy_func_provider
from tasks.finetune_utils import finetune
def glue_classification(num_classes, Dataset,
name_from_datapath_func):
def train_valid_datasets_provider():
"""Build train and validation dataset."""
args = get_args()
tokenizer = get_tokenizer()
train_dataset = Dataset('training', args.train_data,
tokenizer, args.seq_length)
valid_dataset = Dataset('validation', args.valid_data,
tokenizer, args.seq_length)
return train_dataset, valid_dataset
def model_provider():
"""Build the model."""
args = get_args()
print_rank_0('building classification model for {} ...'.format(
args.task))
return Classification(num_classes=num_classes, num_tokentypes=2)
def metrics_func_provider():
"""Privde metrics callback function."""
def single_dataset_provider(datapath):
args = get_args()
tokenizer = get_tokenizer()
name = name_from_datapath_func(datapath)
return Dataset(name, [datapath], tokenizer, args.seq_length)
return accuracy_func_provider(single_dataset_provider)
"""Finetune/evaluate."""
finetune(train_valid_datasets_provider, model_provider,
end_of_epoch_callback_provider=metrics_func_provider)
def main():
args = get_args()
if args.task == 'MNLI':
num_classes = 3
from tasks.glue.mnli import MNLIDataset as Dataset
def name_from_datapath(datapath):
return datapath.split('MNLI')[-1].strip(
'.tsv').strip('/').replace('_', '-')
elif args.task == 'QQP':
num_classes = 2
from tasks.glue.qqp import QQPDataset as Dataset
def name_from_datapath(datapath):
return datapath.split('QQP')[-1].strip(
'.tsv').strip('/').replace('_', '-')
else:
raise NotImplementedError('GLUE task {} is not implemented.'.format(
args.task))
glue_classification(num_classes, Dataset, name_from_datapath)
# coding=utf-8
# Copyright (c) 2020, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""MNLI dataset."""
from megatron import print_rank_0
from tasks.data_utils import clean_text
from .data import GLUEAbstractDataset
LABELS = {'contradiction': 0, 'entailment': 1, 'neutral': 2}
class MNLIDataset(GLUEAbstractDataset):
def __init__(self, name, datapaths, tokenizer, max_seq_length,
test_label='contradiction'):
self.test_label = test_label
super().__init__('MNLI', name, datapaths,
tokenizer, max_seq_length)
def process_samples_from_single_path(self, filename):
""""Implement abstract method."""
print_rank_0(' > Processing {} ...'.format(filename))
samples = []
total = 0
first = True
is_test = False
with open(filename, 'r') as f:
for line in f:
row = line.strip().split('\t')
if first:
first = False
if len(row) == 10:
is_test = True
print_rank_0(
' reading {}, {} and {} columns and setting '
'labels to {}'.format(
row[0].strip(), row[8].strip(),
row[9].strip(), self.test_label))
else:
print_rank_0(' reading {} , {}, {}, and {} columns '
'...'.format(
row[0].strip(), row[8].strip(),
row[9].strip(), row[-1].strip()))
continue
text_a = clean_text(row[8].strip())
text_b = clean_text(row[9].strip())
unique_id = int(row[0].strip())
label = row[-1].strip()
if is_test:
label = self.test_label
assert len(text_a) > 0
assert len(text_b) > 0
assert label in LABELS
assert unique_id >= 0
sample = {'text_a': text_a,
'text_b': text_b,
'label': LABELS[label],
'uid': unique_id}
total += 1
samples.append(sample)
if total % 50000 == 0:
print_rank_0(' > processed {} so far ...'.format(total))
print_rank_0(' >> processed {} samples.'.format(len(samples)))
return samples
# coding=utf-8
# Copyright (c) 2020, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""QQP dataset."""
from megatron import print_rank_0
from tasks.data_utils import clean_text
from .data import GLUEAbstractDataset
LABELS = [0, 1]
class QQPDataset(GLUEAbstractDataset):
def __init__(self, name, datapaths, tokenizer, max_seq_length,
test_label=0):
self.test_label = test_label
super().__init__('QQP', name, datapaths,
tokenizer, max_seq_length)
def process_samples_from_single_path(self, filename):
""""Implement abstract method."""
print_rank_0(' > Processing {} ...'.format(filename))
samples = []
total = 0
first = True
is_test = False
with open(filename, 'r') as f:
for line in f:
row = line.strip().split('\t')
if first:
first = False
if len(row) == 3:
is_test = True
print_rank_0(' reading {}, {}, and {} columns and '
'setting labels to {}'.format(
row[0].strip(), row[1].strip(),
row[2].strip(), self.test_label))
else:
assert len(row) == 6
print_rank_0(' reading {}, {}, {}, and {} columns'
' ...'.format(
row[0].strip(), row[3].strip(),
row[4].strip(), row[5].strip()))
continue
if is_test:
assert len(row) == 3, 'expected length 3: {}'.format(row)
uid = int(row[0].strip())
text_a = clean_text(row[1].strip())
text_b = clean_text(row[2].strip())
label = self.test_label
assert len(text_a) > 0
assert len(text_b) > 0
else:
if len(row) == 6:
uid = int(row[0].strip())
text_a = clean_text(row[3].strip())
text_b = clean_text(row[4].strip())
label = int(row[5].strip())
else:
print_rank_0('***WARNING*** index error, '
'skipping: {}'.format(row))
continue
if len(text_a) == 0:
print_rank_0('***WARNING*** zero length a, '
'skipping: {}'.format(row))
continue
if len(text_b) == 0:
print_rank_0('***WARNING*** zero length b, '
'skipping: {}'.format(row))
continue
assert label in LABELS
assert uid >= 0
sample = {'uid': uid,
'text_a': text_a,
'text_b': text_b,
'label': label}
total += 1
samples.append(sample)
if total % 50000 == 0:
print_rank_0(' > processed {} so far ...'.format(total))
print_rank_0(' >> processed {} samples.'.format(len(samples)))
return samples
# coding=utf-8
# Copyright (c) 2020, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Main tasks functionality."""
import os
import sys
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__),
os.path.pardir)))
from megatron import get_args
from megatron.initialize import initialize_megatron
def get_tasks_args(parser):
"""Provide extra arguments required for tasks."""
group = parser.add_argument_group(title='tasks')
group.add_argument('--task', type=str, required=True,
help='Task name.')
group.add_argument('--epochs', type=int, default=None,
help='Number of finetunning epochs. Zero results in '
'evaluation only.')
group.add_argument('--pretrained-checkpoint', type=str, default=None,
help='Pretrained checkpoint used for finetunning.')
group.add_argument('--keep-last', action='store_true',
help='Keep the last batch (maybe incomplete) in'
'the data loader')
group.add_argument('--train-data', nargs='+', default=None,
help='Whitespace separated paths or corpora names '
'for training.')
group.add_argument('--valid-data', nargs='*', default=None,
help='path(s) to the validation data.')
group.add_argument('--overlapping-eval', type=int, default=32,
help='Sliding window for overlapping evaluation.')
group.add_argument('--strict-lambada', action='store_true',
help='Use more difficult formulation of lambada.')
return parser
if __name__ == '__main__':
initialize_megatron(extra_args_provider=get_tasks_args)
args = get_args()
if args.task == 'RACE':
from race.finetune import main
elif args.task in ['MNLI', 'QQP']:
from glue.finetune import main
elif args.task in ['LAMBADA', 'WIKITEXT103']:
from zeroshot_gpt2.evaluate import main
else:
raise NotImplementedError('Task {} is not implemented.'.format(
args.task))
main()
import glob
import json
import os
import time
from torch.utils.data import Dataset
from megatron import print_rank_0
from tasks.data_utils import build_sample
from tasks.data_utils import build_tokens_types_paddings_from_ids
from tasks.data_utils import clean_text
NUM_CHOICES = 4
MAX_QA_LENGTH = 128
class RaceDataset(Dataset):
def __init__(self, dataset_name, datapaths, tokenizer, max_seq_length,
max_qa_length=MAX_QA_LENGTH):
self.dataset_name = dataset_name
print_rank_0(' > building RACE dataset for {}:'.format(
self.dataset_name))
string = ' > paths:'
for path in datapaths:
string += ' ' + path
print_rank_0(string)
self.samples = []
for datapath in datapaths:
self.samples.extend(process_single_datapath(datapath, tokenizer,
max_qa_length,
max_seq_length))
print_rank_0(' >> total number of samples: {}'.format(
len(self.samples)))
def __len__(self):
return len(self.samples)
def __getitem__(self, idx):
return self.samples[idx]
def process_single_datapath(datapath, tokenizer, max_qa_length, max_seq_length):
"""Read in RACE files, combine, clean-up, tokenize, and convert to
samples."""
print_rank_0(' > working on {}'.format(datapath))
start_time = time.time()
# Get list of files.
filenames = glob.glob(os.path.join(datapath, '*.txt'))
samples = []
num_docs = 0
num_questions = 0
num_samples = 0
# Load all the files
for filename in filenames:
with open(filename, 'r') as f:
for line in f:
data = json.loads(line)
num_docs += 1
context = data["article"]
questions = data["questions"]
choices = data["options"]
answers = data["answers"]
# Check the length.
assert len(questions) == len(answers)
assert len(questions) == len(choices)
# Context: clean up and convert to ids.
context = clean_text(context)
context_ids = tokenizer.tokenize(context)
# Loop over questions.
for qi, question in enumerate(questions):
num_questions += 1
# Label.
label = ord(answers[qi]) - ord("A")
assert label >= 0
assert label < NUM_CHOICES
assert len(choices[qi]) == NUM_CHOICES
# For each question, build num-choices samples.
ids_list = []
types_list = []
paddings_list = []
for ci in range(NUM_CHOICES):
choice = choices[qi][ci]
# Merge with choice.
if "_" in question:
qa = question.replace("_", choice)
else:
qa = " ".join([question, choice])
# Clean QA.
qa = clean_text(qa)
# Tokenize.
qa_ids = tokenizer.tokenize(qa)
# Trim if needed.
if len(qa_ids) > max_qa_length:
qa_ids = qa_ids[0:max_qa_length]
# Build the sample.
ids, types, paddings \
= build_tokens_types_paddings_from_ids(
qa_ids, context_ids, max_seq_length,
tokenizer.cls, tokenizer.sep, tokenizer.pad)
ids_list.append(ids)
types_list.append(types)
paddings_list.append(paddings)
# Convert to numpy and add to samples
samples.append(build_sample(ids_list, types_list,
paddings_list, label,
num_samples))
num_samples += 1
elapsed_time = time.time() - start_time
print_rank_0(' > processed {} document, {} questions, and {} samples'
' in {:.2f} seconds'.format(num_docs, num_questions,
num_samples, elapsed_time))
return samples
# coding=utf-8
# Copyright (c) 2020, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Race."""
from megatron import get_args
from megatron import print_rank_0
from megatron import get_tokenizer
from megatron.model.multiple_choice import MultipleChoice
from tasks.eval_utils import accuracy_func_provider
from tasks.finetune_utils import finetune
from tasks.race.data import RaceDataset
def train_valid_datasets_provider():
"""Provide train and validation datasets."""
args = get_args()
tokenizer = get_tokenizer()
train_dataset = RaceDataset('training', args.train_data,
tokenizer, args.seq_length)
valid_dataset = RaceDataset('validation', args.valid_data,
tokenizer, args.seq_length)
return train_dataset, valid_dataset
def model_provider():
"""Build the model."""
print_rank_0('building multichoice model for RACE ...')
return MultipleChoice(num_tokentypes=2)
def metrics_func_provider():
"""Privde metrics callback function."""
args = get_args()
tokenizer = get_tokenizer()
def single_dataset_provider(datapath):
name = datapath.split('RACE')[-1].strip('/').replace('/', '-')
return RaceDataset(name, [datapath], tokenizer, args.seq_length)
return accuracy_func_provider(single_dataset_provider)
def main():
finetune(train_valid_datasets_provider, model_provider,
end_of_epoch_callback_provider=metrics_func_provider)
# coding=utf-8
# Copyright (c) 2020, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Zero-shot datasets."""
import json
import math
import numpy as np
import torch
from megatron import get_args
from megatron import print_rank_0
from megatron import get_tokenizer
from .detokenizer import get_detokenizer
def build_dataset(task):
"""Helper function to select and build dataset."""
if task == 'LAMBADA':
return _build_lambada_dataset()
if task == 'WIKITEXT103':
return _build_wikitext103_dataset()
raise NotImplementedError('dataset for {} task is not '
'implemented.'.format(task))
class _LMDataset(torch.utils.data.Dataset):
def __init__(self, tokens, seq_len, pad_idx, num_original_tokens,
num_tokenized_tokens, overalapping_eval=None):
self.tokens = tokens
self.seq_len = seq_len
self.pad_idx = pad_idx
self.overalapping_eval = overalapping_eval
if self.overalapping_eval is None:
self.overalapping_eval = self.seq_len
self.overalapping_eval = max(1, self.overalapping_eval)
self.num_original_tokens = num_original_tokens
self.num_tokenized_tokens = num_tokenized_tokens
self.total_targets = len(self.tokens) - 1
# remove first sequence tokens
targets = max(self.total_targets - self.overalapping_eval, 0)
self.total_sequences = max(
math.ceil(targets / self.overalapping_eval) + 1, 1)
def __len__(self):
return self.total_sequences
def __getitem__(self, idx):
start_idx = idx * self.overalapping_eval
end_idx = start_idx + self.seq_len
tokens = self.tokens[start_idx:end_idx + 1]
num_tokens = len(tokens)
pad_mask = [1] * num_tokens
if num_tokens < self.seq_len + 1:
num_pad = (self.seq_len + 1 - num_tokens)
pad_mask += [0] * (num_pad)
tokens += [self.pad_idx] * num_pad
pad_mask = np.array(pad_mask[1:])
if self.overalapping_eval != self.seq_len and idx != 0:
pad_mask[:-self.overalapping_eval] *= 0
return {'text': np.array(tokens), 'pad_mask': pad_mask}
class _LambadaDataset(torch.utils.data.Dataset):
def __init__(self, path, pad_idx, tokenizer, seq_len, strict=False):
print_rank_0('> building lambada dataset from {} ...'.format(path))
self.seq_len = seq_len
self.pad_idx = pad_idx
self.tokenizer = tokenizer
self.strict = strict
self.tokens = []
self.labels = []
with open(path, 'r') as f:
for line in f.readlines():
text = json.loads(line)['text']
tokens, labels = self.get_tokens(text)
self.tokens.append(tokens)
self.labels.append(labels)
def get_tokens(self, text):
if not self.strict:
tokens = self.tokenizer.tokenize(text)
return tokens[:-1], [tokens[-1]]
last_token = text.split()[-1]
start_idx = text.rfind(last_token)
beginning_tokens = self.tokenizer.tokenize(text[:start_idx].strip())
last_token = self.tokenizer.tokenize(' ' + last_token)
return beginning_tokens, last_token
def __len__(self):
return len(self.tokens)
def __getitem__(self, idx):
tokens = self.tokens[idx]
num_tokens = len(tokens)
pad_mask = [0] * num_tokens
labels = self.labels[idx]
pad_mask += [1] * len(labels)
tokens = tokens + labels
num_tokens = len(tokens)
if num_tokens < self.seq_len + 1:
num_pad = (self.seq_len + 1 - num_tokens)
pad_mask += [0] * (num_pad)
tokens += [self.pad_idx] * num_pad
pad_mask = np.array(pad_mask[1:])
return {'text': np.array(tokens), 'pad_mask': pad_mask}
def _build_lambada_dataset():
"""Build lambada dataset."""
args = get_args()
tokenizer = get_tokenizer()
assert len(args.valid_data) == 1
val_dataset = _LambadaDataset(args.valid_data[0], tokenizer.eod, tokenizer,
args.seq_length, args.strict_lambada)
print_rank_0(' > found {} samples.'.format(len(val_dataset)))
return val_dataset
def _build_wikitext103_dataset():
""""""
args = get_args()
tokenizer = get_tokenizer()
assert len(args.valid_data) == 1
with open(args.valid_data[0], "rb") as reader:
entire_data = reader.read().decode('utf-8')
num_original_tokens = len(entire_data.strip().split(" "))
entire_data = get_detokenizer(args.valid_data[0])(entire_data)
tokenized_data = tokenizer.tokenize(entire_data)
num_tokenized_tokens = len(tokenized_data)
val_dataset = _LMDataset(tokenized_data, args.seq_length, tokenizer.eod,
num_original_tokens, num_tokenized_tokens,
args.overlapping_eval)
print_rank_0(' > number of original tokens: {}, number of detokenized '
'tokens: {}'.format(num_original_tokens, num_tokenized_tokens))
return val_dataset
# coding=utf-8
# Copyright (c) 2020, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Detokenization."""
import re
def ptb_detokenizer(string):
string = string.replace(" '", "'")
string = string.replace(" \n", "\n")
string = string.replace("\n ", "\n")
string = string.replace(" n't", "n't")
string = string.replace(" N ", "1 ")
string = string.replace("$ 1", "$1")
string = string.replace("# 1", "#1")
return string
def wikitext_detokenizer(string):
# contractions
string = string.replace("s '", "s'")
string = re.sub(r"/' [0-9]/", r"/'[0-9]/", string)
# number separators
string = string.replace(" @-@ ", "-")
string = string.replace(" @,@ ", ",")
string = string.replace(" @.@ ", ".")
# punctuation
string = string.replace(" : ", ": ")
string = string.replace(" ; ", "; ")
string = string.replace(" . ", ". ")
string = string.replace(" ! ", "! ")
string = string.replace(" ? ", "? ")
string = string.replace(" , ", ", ")
# double brackets
string = re.sub(r"\(\s*([^\)]*?)\s*\)", r"(\1)", string)
string = re.sub(r"\[\s*([^\]]*?)\s*\]", r"[\1]", string)
string = re.sub(r"{\s*([^}]*?)\s*}", r"{\1}", string)
string = re.sub(r"\"\s*([^\"]*?)\s*\"", r'"\1"', string)
string = re.sub(r"'\s*([^']*?)\s*'", r"'\1'", string)
# miscellaneous
string = string.replace("= = = =", "====")
string = string.replace("= = =", "===")
string = string.replace("= =", "==")
string = string.replace(" " + chr(176) + " ", chr(176))
string = string.replace(" \n", "\n")
string = string.replace("\n ", "\n")
string = string.replace(" N ", " 1 ")
string = string.replace(" 's", "'s")
return string
def lambada_detokenizer(string):
return string
_DETOKENIZERS = {
'ptb': ptb_detokenizer,
'wiki': wikitext_detokenizer,
'lambada': lambada_detokenizer,
}
def get_detokenizer(path):
for key in _DETOKENIZERS.keys():
if key in path:
return _DETOKENIZERS[key]
# coding=utf-8
# Copyright (c) 2020, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""GPT2 zero-shot evaluation."""
import math
import torch
from megatron import get_args
from megatron import print_rank_0
from megatron import get_tokenizer
from megatron import mpu
from megatron.checkpointing import load_checkpoint
from megatron.model import GPT2Model
from megatron.training import get_model
from megatron.utils import get_ltor_masks_and_position_ids
from tasks.finetune_utils import build_data_loader
from .datasets import build_dataset
def get_model_provider(eval_metric):
"""Based on evaluation metric set the parallel-output flag and
return the model provider."""
def model_provider():
"""Build the model."""
if eval_metric == 'loss':
parallel_output = True
elif eval_metric == 'accuracy':
parallel_output = False
else:
raise NotImplementedError('output type for {} evaluation metric '
'is not supported.'.format(eval_metric))
print_rank_0('building GPT2 model ...')
model = GPT2Model(num_tokentypes=0, parallel_output=parallel_output)
return model
return model_provider
def process_batch(batch):
"""Process batch and produce inputs for the model."""
args = get_args()
tokenizer = get_tokenizer()
loss_mask = batch['pad_mask'].long().cuda().contiguous().byte()
tokens_ = batch['text'].long().cuda().contiguous()
labels = tokens_[:, 1:].contiguous()
tokens = tokens_[:, :-1].contiguous()
# Get the masks and postition ids.
attention_mask, _, position_ids = get_ltor_masks_and_position_ids(
tokens,
tokenizer.eod,
args.reset_position_ids,
args.reset_attention_mask,
args.eod_mask_loss)
return tokens, labels, attention_mask, position_ids, loss_mask
def forward_step(batch, model, eval_metric):
"""Forward step."""
# Get the batch.
tokens, labels, attention_mask, position_ids, loss_mask = process_batch(
batch)
# Forward model.
output = model(tokens, position_ids, attention_mask)
# For loss, return the unreduced loss.
if eval_metric == 'loss':
losses = mpu.vocab_parallel_cross_entropy(
output.contiguous().float(), labels.contiguous())
loss = torch.sum(
losses.view(-1) * loss_mask.contiguous().view(-1).float())
return loss
# For accuracy, return the number of correctly predicted samples.
if eval_metric == 'accuracy':
outputs = torch.argmax(output, -1)
correct = (outputs == labels).float()
correct[(1 - loss_mask).bool()] = 1
correct = correct.prod(-1)
return correct.sum()
raise NotImplementedError('forward method for evaluation metric {} '
'is not implemented.'.format(eval_metric))
def evaluate(data_loader, model, eval_metric):
"""Evaluation."""
args = get_args()
# Turn on evaluation mode which disables dropout.
model.eval()
total_output = 0.0
with torch.no_grad():
# For all the batches in the dataset.
for iteration, batch in enumerate(data_loader):
if iteration % args.log_interval == 0:
print_rank_0('> working on iteration: {}'.format(iteration))
# Forward evaluation.
output = forward_step(batch, model, eval_metric)
# Reduce across processes.
torch.distributed.all_reduce(output,
group=mpu.get_data_parallel_group())
total_output += output
return total_output
def evaluate_and_print_results(task, data_loader, model, eval_metric):
"""Evaluate and print results on screen."""
# Evaluate and get results.
output = evaluate(data_loader, model, eval_metric)
string = ' validation results on {} | '.format(task)
if eval_metric == 'loss':
num_tokenized_tokens = data_loader.dataset.num_tokenized_tokens
num_original_tokens = data_loader.dataset.num_original_tokens
val_loss = output / (num_tokenized_tokens - 1)
ppl = math.exp(min(20, val_loss))
token_ratio = (num_tokenized_tokens - 1) / (num_original_tokens - 1)
adjusted_ppl = math.exp(min(20, val_loss * token_ratio))
string += 'avg loss: {:.4E} | '.format(val_loss)
string += 'ppl: {:.4E} | '.format(ppl)
string += 'adjusted ppl: {:.4E} | '.format(adjusted_ppl)
string += 'token ratio: {} |'.format(token_ratio)
elif eval_metric == 'accuracy':
num_examples = len(data_loader.dataset)
acc = output / num_examples
string += 'number correct: {:.4E} | '.format(output)
string += 'total examples: {:.4E} | '.format(num_examples)
string += 'avg accuracy: {:.4E}'.format(acc)
else:
raise NotImplementedError('evaluation method for {} metric is not '
'implemented yet.'.format(eval_metric))
length = len(string) + 1
print_rank_0('-' * length)
print_rank_0(string)
print_rank_0('-' * length)
def main():
"""Main program."""
args = get_args()
if args.task == 'LAMBADA':
eval_metric = 'accuracy'
elif args.task == 'WIKITEXT103':
eval_metric = 'loss'
else:
raise NotImplementedError('{} task is not implemented.'.format(
args.task))
# Set up model and load checkpoint.
model = get_model(get_model_provider(eval_metric))
if args.load is not None:
_ = load_checkpoint(model, None, None)
# Data stuff.
dataset = build_dataset(args.task)
dataloader = build_data_loader(dataset, args.batch_size,
args.num_workers, drop_last=False)
# Run evaluation.
evaluate_and_print_results(args.task, dataloader, model, eval_metric)
print_rank_0('done :-)')
import sys
sys.path.append('../')
from megatron.indexer import IndexBuilder
from megatron.initialize import initialize_megatron
def main():
"""Create a BlockData data structure by running an IndexBuilder over an ICT Dataset
- Include all args needed for initial model specification
Other key args:
--block-data-path: path to write to
--ict-load or --realm-load: path to checkpoint with which to embed
--data-path and --titles-data-path: paths for dataset
--indexer-log-interval: reporting interval
--indexer-batch-size: size specific for indexer jobs
Check README.md for example script
"""
initialize_megatron(extra_args_provider=None,
args_defaults={'tokenizer_type': 'BertWordPieceLowerCase'})
index_builder = IndexBuilder()
index_builder.build_and_save_index()
if __name__ == "__main__":
main()
# coding=utf-8
# Copyright (c) 2020, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Sample Generate GPT2"""
import os
import sys
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__),
os.path.pardir)))
from megatron import get_args
from megatron import print_rank_0
from megatron import get_tokenizer
from megatron.checkpointing import load_checkpoint
from megatron.initialize import initialize_megatron
from megatron.model import GPT2Model
from megatron.training import get_model
from megatron.text_generation_utils import generate_and_write_samples_unconditional
from megatron.text_generation_utils import generate_samples_input_from_file
from megatron.text_generation_utils import generate_samples_interactive
def model_provider():
"""Build the model."""
print_rank_0('building GPT2 model ...')
model = GPT2Model(num_tokentypes=0, parallel_output=False)
return model
def add_text_generate_args(parser):
"""Text generation arguments."""
group = parser.add_argument_group(title='text generation')
group.add_argument("--temperature", type=float, default=1.0,
help='Sampling temperature.')
group.add_argument("--greedy", action='store_true', default=False,
help='Use greedy sampling.')
group.add_argument("--top_p", type=float, default=0.0,
help='Top p sampling.')
group.add_argument("--top_k", type=int, default=0,
help='Top k sampling.')
group.add_argument("--out-seq-length", type=int, default=1024,
help='Size of the output generated text.')
group.add_argument("--sample-input-file", type=str, default=None,
help='Get input from file instead of interactive mode, '
'each line is an input.')
group.add_argument("--sample-output-file", type=str, default=None,
help='Output file got from --sample-input-file')
group.add_argument("--num-samples", type=int, default=0,
help='Number of samples to generate unconditionally, '
'defaults to 0 and interactive conditional sampling')
group.add_argument("--genfile", type=str,
help='Output file when generating unconditionally')
group.add_argument("--recompute", action='store_true',
help='During generation recompute all attention '
'instead of using previously computed keys/values.')
return parser
def main():
"""Main program."""
initialize_megatron(extra_args_provider=add_text_generate_args,
args_defaults={'tokenizer_type': 'GPT2BPETokenizer'})
# Set up model and load checkpoint.
model = get_model(model_provider)
args = get_args()
if args.load is not None:
_ = load_checkpoint(model, None, None)
# Generate samples.
if args.num_samples == 0:
args.batch_size = 1
if args.sample_input_file != None:
generate_samples_input_from_file(model)
else:
generate_samples_interactive(model)
else:
generate_and_write_samples_unconditional(model)
if __name__ == "__main__":
main()
import os
import os.path as osp
import pathlib
import subprocess
def recursively_lint_files():
"""Recursively lint all python files in chosen subdirectories of megatron-lm"""
try:
import autopep8
except ModuleNotFoundError:
print("Please first install autopep8 via `pip install autopep8`")
return
# get all python file paths from top level directory
file_dir = str(pathlib.Path(__file__).parent.absolute())
working_dir = osp.join(file_dir, os.pardir)
all_py_paths = set(os.path.join(working_dir, fname)
for fname in os.listdir(working_dir) if ".py" in fname)
# get all python file paths from chosen subdirectories
check_dirs = ['docker', 'megatron', 'openwebtext', 'scripts', 'tasks']
for sub_dir in check_dirs:
for path, _, fnames in os.walk(osp.join(working_dir, sub_dir)):
all_py_paths.update(set(osp.join(path, fname) for fname in fnames if ".py" in fname))
print("Linting the following: ")
for py_path in all_py_paths:
print(py_path)
command = 'autopep8 --max-line-length 100 --aggressive --in-place {}'.format(py_path)
subprocess.check_call(command)
if __name__ == "__main__":
recursively_lint_files()
# coding=utf-8
# Copyright (c) 2020, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Merge model parallel partitions."""
import os
import sys
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__),
os.path.pardir)))
import torch
from megatron import mpu
from megatron.checkpointing import ensure_directory_exists
from megatron.checkpointing import get_checkpoint_name
from megatron.checkpointing import get_checkpoint_tracker_filename
from megatron.global_vars import rebuild_tokenizer
from megatron.global_vars import _parse_args
def split_into_partitions(tensor, num_partitions, partition_dim, stride):
per_partition_size = mpu.utils.divide(tensor.size(partition_dim),
num_partitions)
per_partition_per_stride_size = mpu.utils.divide(per_partition_size, stride)
partitions_list = torch.split(tensor,
per_partition_per_stride_size,
dim=partition_dim)
partitions = []
for i in range(num_partitions):
partition = torch.cat(partitions_list[i::num_partitions],
dim=partition_dim)
partitions.append(partition)
return partitions
def merge_partitions(merged, partitions, partition_dim, stride):
# Number and size of each partition.
num_partitions = len(partitions)
per_partition_size = None
for partition in partitions:
if per_partition_size is None:
per_partition_size = partition.size(partition_dim)
else:
assert per_partition_size == partition.size(partition_dim)
def concat_partitions(partitions_):
with torch.no_grad():
if (per_partition_size * num_partitions) == merged.size(
partition_dim):
torch.cat(partitions_, dim=partition_dim, out=merged)
else:
print(' ***WARNING*** sizes do not match. Will cut '
'the merged partitions by {} along dimension {} '
'to reduce the size from {} to {} ...'.format(
(per_partition_size * num_partitions) - \
merged.size(partition_dim), partition_dim,
per_partition_size * num_partitions,
merged.size(partition_dim)))
merged_ = torch.cat(partitions_, dim=partition_dim)
merged_split = torch.split(merged_, merged.size(partition_dim),
dim=partition_dim)
merged_ = merged_split[0]
assert merged_.size(partition_dim) == merged.size(partition_dim)
merged.data.copy_(merged_.data)
# If stride is 1, then do simple concatination.
if stride == 1:
concat_partitions(partitions)
return
# For none unity strides, first split based on stride and then group.
per_partition_per_stride_size = mpu.utils.divide(per_partition_size, stride)
# Chunk and build a list.
chunks = None
for i, partition in enumerate(partitions):
chunk = torch.split(partition,
per_partition_per_stride_size,
dim=partition_dim)
if chunks is None:
chunks = [0]*(num_partitions*len(chunk))
chunks[i::num_partitions] = chunk
# Concatinate.
concat_partitions(chunks)
return
def get_model(model_type):
if model_type == 'BERT':
from pretrain_bert import model_provider
elif model_type == 'GPT2':
from pretrain_gpt2 import model_provider
elif model_type == 'RACE':
from tasks.race.finetune import model_provider
elif model_type == ['MNLI', 'QQP']:
num_classes = 2
if model_type == 'MNLI':
num_classes = 3
from megatron.model.classification import Classification
def model_provider():
return Classification(num_classes=num_classes, num_tokentypes=2)
else:
raise Exception('unrecognized model type: {}'.format(model_type))
model = model_provider()
model = model.half()
return model
def get_parallel_checkpoint_name(path):
tracker_filename = get_checkpoint_tracker_filename(path)
iteration = 0
with open(tracker_filename, 'r') as f:
metastring = f.read().strip()
iteration = int(metastring)
assert iteration > 0
checkpoint_name = get_checkpoint_name(path, iteration)
return checkpoint_name, iteration
def test_split_merge():
print('testing split and merge ...')
#[QKV.ROW-COL]
tensor = torch.FloatTensor([[1.11, 1.12, 1.13, 1.14, 1.15],
[1.21, 1.22, 1.23, 1.24, 1.25],
[1.31, 1.32, 1.33, 1.34, 1.35],
[1.41, 1.42, 1.43, 1.44, 1.45],
[2.11, 2.12, 2.13, 2.14, 2.15],
[2.21, 2.22, 2.23, 2.24, 2.25],
[2.31, 2.32, 2.33, 2.34, 2.35],
[2.41, 2.42, 2.43, 2.44, 2.45],
[3.11, 3.12, 3.13, 3.14, 3.15],
[3.21, 3.22, 3.23, 3.24, 3.25],
[3.31, 3.32, 3.33, 3.34, 3.35],
[3.41, 3.42, 3.43, 3.44, 3.45]])
num_partitions = 2
partition_dim = 0
stride = 3
partitions = split_into_partitions(tensor, num_partitions,
partition_dim, stride)
merged = torch.zeros_like(tensor)
merge_partitions(merged, partitions, partition_dim, stride)
max_error = (merged - tensor).abs().max()
print(' > max error (should be zero): {}'.format(max_error))
def get_mp_merge_args(parser):
"""Provide extra arguments required for merging."""
group = parser.add_argument_group(title='mp merge')
group.add_argument('--model-type', type=str, required=True,
choices=['BERT', 'GPT2', 'RACE', 'MNLI', 'QQP'],
help='Type of the mdoel.')
return parser
def main():
# Args
args = _parse_args(extra_args_provider=get_mp_merge_args)
model_type = args.model_type
orig_model_parallel_size = args.model_parallel_size
args.model_parallel_size = 1
tokenizer = rebuild_tokenizer(args)
print('\n merging model parallel partitions ...')
print(' > number of partitions: {}'.format(orig_model_parallel_size))
print(' > checkpoint path: {}'.format(args.load))
print(' > model parameters:')
print(' number of tokens ................ {} '.format(
tokenizer.vocab_size))
print(' number of layers ................ {}'.format(args.num_layers))
print(' hidden sise ..................... {}'.format(args.hidden_size))
print(' number of attention heads ....... {}'.format(
args.num_attention_heads))
print(' maximum position embeddings ..... {}'.format(
args.max_position_embeddings))
# Full model.
print('> building the full model ...')
mpu.initialize.set_model_parallel_world_size(1)
mpu.initialize.set_model_parallel_rank(0)
merged_model = get_model(model_type)
# Build and load partitions.
partitions = []
iteration = 0
args.model_parallel_size = orig_model_parallel_size
tokenizer = rebuild_tokenizer(args)
mpu.initialize.set_model_parallel_world_size(args.model_parallel_size)
for rank in range(args.model_parallel_size):
mpu.initialize.set_model_parallel_rank(rank)
checkpoint_name, iteration = get_parallel_checkpoint_name(args.load)
print('> loading {} ...'.format(checkpoint_name))
model_ = get_model(model_type)
sd = torch.load(checkpoint_name, map_location='cpu')
model_.load_state_dict(sd['model'])
partitions.append(model_)
# Parameter generators so we can loop through them semiltaneouly.
merged_params_gen = merged_model.named_parameters()
partitions_params_gen = [partition.named_parameters()
for partition in partitions]
while True:
try:
# Get the params and check names.
name, merged_param = next(merged_params_gen)
print(' > working on {} ...'.format(name))
print(' merged type: {}, size: {}'.format(
merged_param.dtype, list(merged_param.size())))
partitions_param = []
for rank, partition_params_gen in enumerate(partitions_params_gen):
partition_name, partition_param = next(partition_params_gen)
assert partition_name == name
partitions_param.append(partition_param)
print(' partition {} type: {}, size: {}'.format(
rank, partition_param.dtype, list(partition_param.size())))
# For the non-parallel parameters, simply copy the rank 0 values.
if not hasattr(merged_param, 'model_parallel'):
print(' none-parallel parameter, simple copy from rank 0')
with torch.no_grad():
merged_param.data.copy_(partitions_param[0].data)
# For parallel parameters, merge the values
else:
print(' parallel parameter merge with stride {} along '
'dimention {}'.format(merged_param.stride,
merged_param.partition_dim))
merge_partitions(merged_param,
partitions_param,
merged_param.partition_dim,
merged_param.stride)
except StopIteration:
break
# Save the model.
args.model_parallel_size = 1
mpu.initialize.set_model_parallel_rank(0)
sd = {}
sd['model'] = merged_model.state_dict_for_save_checkpoint()
sd['iteration'] = iteration
merged_path = os.path.join(args.load, 'merged')
checkpoint_name = get_checkpoint_name(merged_path, iteration)
ensure_directory_exists(checkpoint_name)
print('> saving merged model to {}'.format(checkpoint_name))
torch.save(sd, checkpoint_name)
print('done :-)')
if __name__ == '__main__':
main()
The following steps show how to prepare training dataset to train the mode.
# Libraries to install
```
pip install ftfy langdetect numpy torch pandas nltk sentencepiece boto3 tqdm regex bs4 newspaper3k htmlmin tldextract
git clone https://github.com/mattilyra/LSH
cd LSH
python setup.py install
```
# Download the dataset
1. Download the deduplicated URLs from [jcpeterson](https://mega.nz/#F!EZZD0YwJ!9_PlEQzdMVLaNdKv_ICNVQ!cc4RgQQZ)
2. Remove blacklisted URLs.
```
python blacklist_urls.py <path to the dowloaded deduplicated URLs> <filename for clean urls. e.g. clean_urls.txt>
```
3. Download the content from the clean urls with [openwebtext's utilities](https://github.com/eukaryote31/openwebtext/blob/master/download.py).
4. Merge the contents into one loose json file with 1 json per newline of the format `{'text': text, 'url': unique_url}`. It is important for the url to be unique.
# Prepare the data for GPT-2 training:
1. Perform ftfy, english detection and remove documents with less than 128 tokens. This step can be sharded and run on shards.
```
python cleanup_dataset.py <input data file> <output cleaned data filename>
```
2. Using LSH, find possible duplicates and store then in a file for later processing. This step can NOT be sharded and usually takes 12 to 24 hours for OpenWebText dataset.
```
python find_duplicates.py <input cleaned data file> <output possible duplicate urls filename>
```
3. Based on similarity measure defind inside function `is_similar` (default: 0.9), group urls that are similar. Basically, for each group, only one url we should keep and remove the rest.
```
python group_duplicate_urls.py <possible duplicate urls file> <output file containing similar urls>
```
4. Remove similar documents that were detected in the last step.
```
python remove_group_duplicates.py <file containing simialr documents> <cleaned data file> <outputfile containing deduplicate data>
```
5. Shuffle the dataset.
```
shuf <cleaned deduped data file> -o train_data.json
```
# coding=utf-8
# Copyright (c) 2020, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import glob
import re
import time
import tldextract
import sys
# List of the domains to blacklist.
domain_blacklist = set([
'500px',
'aapks',
'akamaihd',
'amazon',
'apple',
'artifactfire',
'artstation',
'awwni',
'bandcamp',
'battleforthenet',
'coinscalendar',
'dailymotion',
'deviantart',
'discord',
'discordapp',
'dlapkandroid',
'dropbox',
'e621',
'ebay',
'edealinfo',
'erome',
'eroshare',
'explosm',
'facebook',
'fbcdn',
'flickr',
'furaffinity',
'futhead',
'gatopardo',
'gfycat',
'gifsound',
'gifsoup',
'giphy',
'github',
'google',
'gunprime',
'gyazo',
'hotdealstar',
'imagefap',
'imageshack',
'imgflip',
'imgur',
'instagram',
'karmadecay',
'kryptocal',
'kym-cdn',
'liveleak',
'livememe',
'lmgtfy',
'magaimg',
'memegenerator',
'minorplanetcenter',
'minus',
'mobafire',
'morejpeg',
'nocookie',
'pcpartpicker',
'photobucket',
'pinimg',
'pinterest',
'pixiv',
'pornhub',
'prntscr',
'puu',
'qkme',
'quickmeme',
'radd',
'redd',
'reddit',
'reddit-stream',
'redditlog',
'redditmedia',
'reddituploads',
'redtube',
'reupp',
'reverb',
'roanoke',
'rollingstone',
'sli',
'soundcloud',
'soundgasm',
'spankbang',
'spotify',
'strawpoll',
'streamable',
'timeanddate',
'tinypic',
'touhouradio',
'tumblr',
'twimg',
'twitch',
'twitter',
'vid',
'vimeo',
'vine',
'vkaao',
'vocaroo',
'voyagefusion',
'walmart',
'wciu',
'wikimedia',
'wikipedia',
'xhamster',
'xkcd',
'xvideos',
'youtu',
'youtube',
'youtubedoubler',
'ytimg',
'zillexplorer',
])
def domain_is_in_blacklist(url):
domain = tldextract.extract(url).domain
return domain in domain_blacklist
# List of extentions to blacklist.
extentions_blacklist = (
'.3gp',
'.7z'
'.ai',
'.aif',
'.apk',
'.app',
'.avi',
'.bin',
'.bmp',
'.bz2',
'.css',
'.csv',
'.dat',
'.deb',
'.dmg',
'.doc',
'.docx',
'.exe',
'.gif',
'.gifv',
'.gz',
'.iso',
'.jar',
'.jpeg',
'.jpg',
'.js',
'.log',
'.mid',
'.midi',
'.mkv',
'.mov',
'.mp3',
'.mp4',
'.mpeg',
'.mpg',
'.ogg',
'.ogv',
'.otf',
'.pdf',
'.pkg',
'.png',
'.pps',
'.ppt',
'.pptx',
'.psd',
'.py',
'.qt',
'.ram',
'.rar',
'.sql',
'.svg',
'.swf',
'.tar.gz',
'.tar',
'.tgz',
'.tiff',
'.ttf',
'.txt',
'.wav',
'.webm',
'.wma',
'.wmv',
'.xls',
'.xlsx',
'.xml',
'.xz',
'.zip',
)
def extention_is_in_blacklist(url):
if url.split('?')[0].lower().endswith(extentions_blacklist):
return True
return False
# Malformed urls.
# This function is adapted from:
# https://stackoverflow.com/questions/7160737/python-how-to-validate-a-url-in-python-malformed-or-not
url_regex = re.compile(
r'^(?:http)s?://' # http:// or https://
r'(?:(?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\.)+(?:[A-Z]{2,6}\.?|[A-Z0-9-]{2,}\.?)|' #domain...
r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})' # ...or ip
r'(?::\d+)?' # optional port
r'(?:/?|[/?]\S+)$', re.IGNORECASE)
def url_is_malformed(url):
return re.match(url_regex, url) is None
def print_progress(prefix, start_time, urls_counter,
domain_blacklist_counter,
extention_blacklist_counter,
short_url_counter, malformed_url_counter,
duplicate_url_counter):
string = prefix + ' | '
string += 'time elapsed (s): {:.2f} | '.format(time.time() - start_time)
string += 'number of urls: {} | '.format(urls_counter)
string += 'domain blacklisted: {} | '.format(domain_blacklist_counter)
string += 'extention blacklisted: {} | '.format(extention_blacklist_counter)
string += 'short urls (<=8): {} | '.format(short_url_counter)
string += 'malformed urls: {} | '.format(malformed_url_counter)
string += 'duplicate urls: {}'.format(duplicate_url_counter)
print(string, flush=True)
if __name__ == '__main__':
print('remove blacklisted urls ..')
# Path to the url files.
path = sys.argv[1]
# Output url file.
output = sys.argv[2]
# Get the list of url files.
files = glob.glob(path + '/*.txt')
print('> found {} files'.format(len(files)))
urls = set()
urls_counter = 0
domain_blacklist_counter = 0
extention_blacklist_counter = 0
short_url_counter = 0
malformed_url_counter = 0
duplicate_url_counter = 0
start_time = time.time()
for filename in files:
with open(filename, 'r') as f:
for line in f:
url = line.strip()
urls_counter += 1
if domain_is_in_blacklist(url):
print('[DOMAIN BLACKLIST]: {}'.format(url), flush=True)
domain_blacklist_counter += 1
elif extention_is_in_blacklist(url):
print('[EXTENTION BLACKLIST]: {}'.format(url), flush=True)
extention_blacklist_counter += 1
elif len(url) <= 8:
print('[SHORT URL]: {}'.format(url), flush=True)
short_url_counter += 1
elif url_is_malformed(url):
print('[MALFORMED URL]: {}'.format(url), flush=True)
malformed_url_counter += 1
elif url in urls:
print('[DUPLICATE URL]: {}'.format(url), flush=True)
duplicate_url_counter += 1
else:
urls.add(url)
if urls_counter % 100000 == 0:
print_progress('PROGRESS', start_time, urls_counter,
domain_blacklist_counter,
extention_blacklist_counter,
short_url_counter, malformed_url_counter,
duplicate_url_counter)
print_progress('FINAL', start_time, urls_counter,
domain_blacklist_counter,
extention_blacklist_counter,
short_url_counter, malformed_url_counter,
duplicate_url_counter)
# Write the final set of urls.
print('> writing cleaned up url list to {}'.format(output))
with open(output, 'w') as f:
for url in urls:
f.write(url + '\n')
print('done :-)')
# coding=utf-8
# Copyright (c) 2020, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import ftfy
import json
from langdetect import detect
import numpy as np
import time
import os
import sys
from tokenizer import Tokenizer
MIN_DOCUMENT_LENGHT = 128
def print_progress(prefix, start_time, num_docs, num_fixed_text,
num_non_english_docs, chars_non_english_docs,
num_small_docs, chars_small_docs):
string = prefix + ' | '
string += 'elapsed time: {:.2f} | '.format(time.time() - start_time)
string += 'documents: {} | '.format(num_docs)
string += 'fixed text: {} | '.format(num_fixed_text)
string += 'non-english: {} | '.format(num_non_english_docs)
string += 'non-english chars: {} | '.format(chars_non_english_docs)
string += 'small docs: {} | '.format(num_small_docs)
string += 'small docs chars: {}'.format(chars_small_docs)
print(string, flush=True)
def filter_corpus(filename, out_filename, print_interval=10000):
print(' > filtering {}'.format(filename))
tokenizer = Tokenizer(cache_dir='./cache')
num_docs = 0
num_written_docs = 0
num_small_docs = 0
num_fixed_text = 0
num_non_english_docs = 0
chars_non_english_docs = 0
chars_small_docs = 0
start_time = time.time()
with open(out_filename, 'wb') as f:
with open(filename, 'r') as fin:
for line in fin:
try:
num_docs += 1
myjson = json.loads(line)
# Fix text
text = ftfy.fix_text(myjson['text'])
if text != myjson['text']:
num_fixed_text += 1
myjson['text'] = text
# Detect language.
if detect(text) != 'en':
print('[non-english text]', myjson)
num_non_english_docs += 1
chars_non_english_docs += len(text)
continue
# On average each token is 5 characters so 8 is an
# upper bound.
if len(text) < (8 * MIN_DOCUMENT_LENGHT):
tokens = tokenizer.tokenize_document(text)
if len(tokens) < MIN_DOCUMENT_LENGHT:
print('[small document, skipping]:', myjson)
num_small_docs += 1
chars_small_docs += len(text)
continue
myjson = json.dumps(myjson, ensure_ascii=False)
f.write(myjson.encode('utf-8'))
f.write('\n'.encode('utf-8'))
num_written_docs += 1
if num_docs % print_interval == 0:
print_progress('[PROGRESS]', start_time, num_docs,
num_fixed_text, num_non_english_docs,
chars_non_english_docs,
num_small_docs, chars_small_docs)
except Exception as e:
print(' skipping ', line, e)
print_progress('[FINAL]', start_time, num_docs,
num_fixed_text, num_non_english_docs,
chars_non_english_docs,
num_small_docs, chars_small_docs)
if __name__ == '__main__':
print('building gpt2 dataset ...')
input_filename = sys.argv[1]
output_filename = sys.argv[2]
print('will be reading {}'.format(input_filename))
print('and will write the results to {}'.format(output_filename))
filter_corpus(input_filename, output_filename)
# coding=utf-8
# Copyright (c) 2020, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import itertools
import json
from lsh import cache, minhash
import time
import sys
# This function is adapted from:
# https://github.com/mattilyra/LSH/blob/master/examples/Introduction.ipynb
def shingles(text, char_ngram=5):
return set(text[head:head + char_ngram]
for head in range(0, len(text) - char_ngram))
# This function is adapted from:
# https://github.com/mattilyra/LSH/blob/master/examples/Introduction.ipynb
def jaccard(set_a, set_b):
intersection = set_a & set_b
union = set_a | set_b
return len(intersection) / len(union)
if __name__ == '__main__':
print('finding possible duplicate content ...')
input = sys.argv[1]
output = sys.argv[2]
hasher = minhash.MinHasher(seeds=100, char_ngram=5, hashbytes=4)
lshcache = cache.Cache(bands=10, hasher=hasher)
counter = 0
url_doc = {}
start_time = time.time()
with open(input, 'r') as f:
for line in f:
try:
myjson = json.loads(line)
url = myjson['url']
text = myjson['text']
counter += 1
url_doc[url] = text
lshcache.add_fingerprint(hasher.fingerprint(text), url)
except Exception as e:
print('Error:', e)
if counter % 10000 == 0:
print(' [read]> processed {} documents in {:.2f} seconds ...'.
format(counter, time.time() - start_time), flush=True)
counter = 0
start_time = time.time()
deduped = 0
with open(output, 'wb') as f:
for b in lshcache.bins:
for bucket_id in b:
if len(b[bucket_id]) > 1:
items = list(b[bucket_id])
main_url = items[0]
main_dhingles = shingles(url_doc[main_url])
remove_urls = []
for i in range(1, len(items)):
counter += 1
other_url= items[i]
other_shingles = shingles(url_doc[other_url])
try:
jaccard_sim = jaccard(main_dhingles, other_shingles)
except Exception as e:
print('Error:', e)
if jaccard_sim > 0.5:
remove_urls.append({other_url: jaccard_sim})
deduped += 1
if counter % 10000 == 0:
print(' [write]> processed {} documents in {:.2f} '
'seoncds and deduped {} documents ...'.
format(counter, time.time() - start_time,
deduped), flush=True)
if len(remove_urls) > 0:
myjson = json.dumps({main_url: remove_urls},
ensure_ascii=False)
f.write(myjson.encode('utf-8'))
f.write('\n'.encode('utf-8'))
print('done :-)')
# coding=utf-8
# Copyright (c) 2020, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import time
import sys
def is_similar(jaccard_similarity):
return (js >= 0.9)
if __name__ == '__main__':
print('grouping duplicate urls ...')
input = sys.argv[1]
output = sys.argv[2]
url_to_index = {}
index_to_urls = []
counter = 0
start_time = time.time()
with open(input, 'r') as f:
for line in f:
counter += 1
myjson = json.loads(line)
urls = []
for main_url in myjson.keys():
urls.append(main_url)
for value in myjson[main_url]:
for other_url, js in value.items():
if is_similar(js):
urls.append(other_url)
current_index = -1
other_indices = set()
for url in urls:
if url in url_to_index:
if current_index == -1:
current_index = url_to_index[url]
elif current_index != url_to_index[url]:
other_indices.add(url_to_index[url])
if current_index == -1:
current_index = len(index_to_urls)
index_to_urls.append(set())
for url in urls:
url_to_index[url] = current_index
index_to_urls[current_index].add(url)
for index in other_indices:
for url in index_to_urls[index]:
index_to_urls[current_index].add(url)
url_to_index[url] = current_index
index_to_urls[index] = None
if counter % 100000 == 0:
print(' > processed {} lines in {} seconds ...'.format(
counter, time.time() - start_time))
total_remove = 0
total_remain = 0
for urls in index_to_urls:
if urls is not None:
if len(urls) > 1:
total_remove += (len(urls) - 1)
total_remain += 1
print('out of {} urls, only {} are unique and {} should be removed'.format(
total_remove+total_remain, total_remain, total_remove))
with open(output, 'wb') as f:
for i, urls in enumerate(index_to_urls):
if urls is not None:
if len(urls) > 1:
myjson = json.dumps({str(i): list(urls)},
ensure_ascii=False)
f.write(myjson.encode('utf-8'))
f.write('\n'.encode('utf-8'))
# coding=utf-8
# Copyright (c) 2020, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import glob
import sys
import json
import argparse
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument("--json_path", type=str, default=".",
help="path where all the json files are located")
parser.add_argument("--output_file", type=str, default="merged_output.json",
help="filename where the merged json should go")
args = parser.parse_args()
json_path = args.json_path
out_file = args.output_file
json_files = glob.glob(json_path + '/*.json')
counter = 0
with open(out_file, 'w') as outfile:
for fname in json_files:
counter += 1
if counter % 1024 == 0:
print("Merging at ", counter, flush=True)
with open(fname, 'r') as infile:
for row in infile:
each_row = json.loads(row)
outfile.write(row)
print("Merged file", out_file, flush=True)
# coding=utf-8
# Copyright (c) 2020, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import time
import sys
if __name__ == '__main__':
url_filename = sys.argv[1]
data_filename = sys.argv[2]
output_filename = sys.argv[3]
urls = set()
with open(url_filename, 'r') as f:
for line in f:
myjson = json.loads(line)
for key in myjson:
this_urls = myjson[key]
for i in range(1, len(this_urls)):
urls.add(this_urls[i])
print('will be removing {} urls'.format(len(urls)), flush=True)
written_docs = 0
removed_docs = 0
removed_chars = 0
start_time = time.time()
with open(output_filename, 'wb') as fout:
with open(data_filename, 'r') as fin:
for line in fin:
try:
myjson = json.loads(line)
url = myjson['url']
if url in urls:
print('removing', myjson)
removed_docs += 1
removed_chars += len(myjson['text'])
continue
myjson = json.dumps(myjson, ensure_ascii=False)
fout.write(myjson.encode('utf-8'))
fout.write('\n'.encode('utf-8'))
written_docs += 1
if written_docs % 10000 == 0:
print(' [PROCESSED] time (s): {:.2f} | written: {} '
'| removed: {} (char: {})'.format(
time.time() - start_time,
written_docs, removed_docs, removed_chars))
except Exception as e:
print('[SKIPPING]', line, e)
print(' [PROCESSED] time (s): {:.2f} | written: {} '
'| removed: {} (char: {})'.format(
time.time() - start_time,
written_docs, removed_docs, removed_chars))
print('done :-)')
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment