Unverified Commit 62156100 authored by Lintang Sutawika's avatar Lintang Sutawika Committed by GitHub
Browse files

Merge pull request #488 from fattorib/multigpu-feature

Data Parallelism
parents 0375b792 d924ca33
...@@ -104,6 +104,19 @@ python write_out.py \ ...@@ -104,6 +104,19 @@ python write_out.py \
This will write out one text file for each task. This will write out one text file for each task.
## Multi-GPU Evaluation
Multi-GPU evaluation is supported through [accelerate](https://github.com/huggingface/accelerate). To initialize the distributed environment, run ```accelerate config``` in terminal and follow the prompts. Once the environment is configured, evaluations can be launched with:
```bash
accelerate launch main.py \
--model hf-causal \
--tasks lambada_openai,arc_easy \
--batch_size 16 \
```
**Warning**: Distributed evaluation requires launching multiple processes of the evaluation script. Running ```python main.py *args*``` instead of ```accelerate launch main.py *args*``` on machine with multiple GPUs will only run the evaluations on a single device.
## Implementing new tasks ## Implementing new tasks
To implement a new task in the eval harness, see [this guide](./docs/task_guide.md). To implement a new task in the eval harness, see [this guide](./docs/task_guide.md).
......
...@@ -285,7 +285,7 @@ class Task(abc.ABC): ...@@ -285,7 +285,7 @@ class Task(abc.ABC):
def doc_to_target(self, doc): def doc_to_target(self, doc):
pass pass
def build_all_requests(self, limit=None): def build_all_requests(self, limit=None, rank=None, world_size=None):
"""Build a set of Instances for a task, and store them in task.instances""" """Build a set of Instances for a task, and store them in task.instances"""
if self.has_test_docs(): if self.has_test_docs():
docs = self.test_docs() docs = self.test_docs()
...@@ -297,10 +297,10 @@ class Task(abc.ABC): ...@@ -297,10 +297,10 @@ class Task(abc.ABC):
), f"Task dataset (path={self.DATASET_PATH}, name={self.DATASET_NAME}) must have valid or test docs!" ), f"Task dataset (path={self.DATASET_PATH}, name={self.DATASET_NAME}) must have valid or test docs!"
instances = [] instances = []
for doc_id, doc in enumerate( for doc_id, doc in utils.create_iterator(
itertools.islice(docs, 0, limit) if limit else docs enumerate(docs), rank, world_size, limit
): ):
# sample fewshot context # sample fewshot context #TODO: need to offset doc_id by rank now!
fewshot_ctx = self.fewshot_context( fewshot_ctx = self.fewshot_context(
doc, self._config.num_fewshot, rnd=random.Random() doc, self._config.num_fewshot, rnd=random.Random()
) )
......
...@@ -2,6 +2,8 @@ import random ...@@ -2,6 +2,8 @@ import random
import itertools import itertools
import collections import collections
import torch
import numpy as np import numpy as np
import lm_eval.api import lm_eval.api
...@@ -14,6 +16,7 @@ from lm_eval.utils import ( ...@@ -14,6 +16,7 @@ from lm_eval.utils import (
positional_deprecated, positional_deprecated,
run_task_tests, run_task_tests,
make_table, make_table,
create_iterator,
get_git_commit_hash, get_git_commit_hash,
) )
...@@ -89,6 +92,7 @@ def simple_evaluate( ...@@ -89,6 +92,7 @@ def simple_evaluate(
decontamination_ngrams_path=decontamination_ngrams_path, decontamination_ngrams_path=decontamination_ngrams_path,
) )
if lm.rank == 0:
# add info about the model and few shot config # add info about the model and few shot config
results["config"] = { results["config"] = {
"model": model, "model": model,
...@@ -101,8 +105,9 @@ def simple_evaluate( ...@@ -101,8 +105,9 @@ def simple_evaluate(
"bootstrap_iters": bootstrap_iters, "bootstrap_iters": bootstrap_iters,
} }
results["git_hash"] = get_git_commit_hash() results["git_hash"] = get_git_commit_hash()
return results return results
else:
return None
decontaminate_suffix = "_decontaminate" decontaminate_suffix = "_decontaminate"
...@@ -152,8 +157,8 @@ def evaluate( ...@@ -152,8 +157,8 @@ def evaluate(
# rnd.seed(42) # rnd.seed(42)
# rnd.shuffle(task_docs) # rnd.shuffle(task_docs)
# for doc_id, doc in enumerate(itertools.islice(task_docs, 0, limit)): task.build_all_requests(limit=limit, rank=lm.rank, world_size=lm.world_size)
task.build_all_requests(limit=limit)
# aggregate Instances by LM method requested to get output. # aggregate Instances by LM method requested to get output.
reqtype = ( reqtype = (
"loglikelihood" "loglikelihood"
...@@ -162,6 +167,15 @@ def evaluate( ...@@ -162,6 +167,15 @@ def evaluate(
) # TODO: this is hacky, fix in task.py ) # TODO: this is hacky, fix in task.py
requests[reqtype].extend(task.instances) requests[reqtype].extend(task.instances)
if lm.world_size > 1:
instances_rnk = torch.tensor(len(task._instances), device=lm.device)
gathered_item = (
lm.accelerator.gather(instances_rnk).cpu().detach().numpy().tolist()
)
# compute number of pseudobatches to pad with (FSDP/DDP require even batches among ranks)
numpad = max(gathered_item) - gathered_item[lm.rank]
### Run LM on inputs, get all outputs ### ### Run LM on inputs, get all outputs ###
# execute each type of request # execute each type of request
for reqtype, reqs in requests.items(): for reqtype, reqs in requests.items():
...@@ -171,6 +185,10 @@ def evaluate( ...@@ -171,6 +185,10 @@ def evaluate(
for req in reqs: for req in reqs:
cloned_reqs.extend([req] * req.repeats) cloned_reqs.extend([req] * req.repeats)
if (lm.world_size > 1) and (numpad > 0):
for _ in range(numpad):
cloned_reqs.extend([req] * req.repeats)
# run requests through model # run requests through model
resps = getattr(lm, reqtype)(cloned_reqs) resps = getattr(lm, reqtype)(cloned_reqs)
...@@ -178,6 +196,9 @@ def evaluate( ...@@ -178,6 +196,9 @@ def evaluate(
for x, req in zip(resps, cloned_reqs): for x, req in zip(resps, cloned_reqs):
req.resps.append(x) req.resps.append(x)
if lm.world_size > 1:
lm.accelerator.wait_for_everyone()
### Postprocess outputs ### ### Postprocess outputs ###
# TODO: del model here, maybe (idea: allow user to specify device of e.g. reward model separately) # TODO: del model here, maybe (idea: allow user to specify device of e.g. reward model separately)
for task_name, task in task_dict.items(): for task_name, task in task_dict.items():
...@@ -192,11 +213,16 @@ def evaluate( ...@@ -192,11 +213,16 @@ def evaluate(
# calculate values for each filter setup (TODO: make getting list of keys cleaner) # calculate values for each filter setup (TODO: make getting list of keys cleaner)
# TODO: make it possible to use a different metric per key # TODO: make it possible to use a different metric per key
for key in task.instances[0].filtered_resps.keys(): for key in task.instances[0].filtered_resps.keys():
for doc_id, doc in enumerate( doc_iterator = (
itertools.islice(task.test_docs(), 0, limit) itertools.islice(
enumerate(task.test_docs()), lm.rank, limit, lm.world_size
)
if task.has_test_docs() if task.has_test_docs()
else task.validation_docs() else itertools.islice(
): enumerate(task.validation_docs()), lm.rank, limit, lm.world_size
)
)
for doc_id, doc in doc_iterator:
# subset instances to only this document id ; sort by idx # subset instances to only this document id ; sort by idx
requests = list(filter(lambda x: x.doc_id == doc_id, task.instances)) requests = list(filter(lambda x: x.doc_id == doc_id, task.instances))
requests.sort(key=lambda x: x.idx) requests.sort(key=lambda x: x.idx)
...@@ -206,13 +232,51 @@ def evaluate( ...@@ -206,13 +232,51 @@ def evaluate(
for metric, value in metrics.items(): for metric, value in metrics.items():
vals[(task_name, key, metric)].append(value) vals[(task_name, key, metric)].append(value)
if lm.world_size > 1:
# if multigpu, then gather data across all ranks
vals_torch = collections.defaultdict(list)
for (task_name, key, metric), items in vals.items():
numitem = 0
if type(items[0]) == tuple:
numitem = len(items[0])
# distributed gather requires all ranks to have same dimensions
# so we pad out with float32 min value
pad_value = torch.finfo(torch.float32).min
metrics_tensor = torch.tensor(items, device=lm.device)
original_dtype = metrics_tensor.dtype # store original dtype
torch_device_tensor = lm.accelerator.pad_across_processes(
metrics_tensor.to(torch.float32), pad_index=pad_value
)
gathered_item = lm.accelerator.gather(torch_device_tensor)
if numitem > 0:
gathered_filtered = gathered_item[gathered_item[:, 0] != pad_value]
else:
gathered_filtered = gathered_item[gathered_item != pad_value]
gathered_item = (
gathered_filtered.to(original_dtype).cpu().detach().numpy().tolist()
)
# reconvert if we were passed a tuple of values
if numitem > 0:
gathered_item = [tuple(g) for g in gathered_item]
if lm.rank == 0:
vals_torch[(task_name, key, metric)] = gathered_item
vals = vals_torch
if lm.rank == 0:
### Aggregate results over all datapoints ### ### Aggregate results over all datapoints ###
# aggregate results ; run bootstrap CIs # aggregate results ; run bootstrap CIs
for (task_name, key, metric), items in vals.items(): for (task_name, key, metric), items in vals.items():
task = task_dict[task_name] task = task_dict[task_name]
results[task_name][metric + " - filter=" + key] = task.aggregation()[metric]( results[task_name][metric + " - filter=" + key] = task.aggregation()[
items metric
) ](items)
# hotfix: bleu, chrf, ter seem to be really expensive to bootstrap # hotfix: bleu, chrf, ter seem to be really expensive to bootstrap
# so we run them less iterations. still looking for a cleaner way to do this # so we run them less iterations. still looking for a cleaner way to do this
...@@ -225,6 +289,11 @@ def evaluate( ...@@ -225,6 +289,11 @@ def evaluate(
) )
if stderr is not None: if stderr is not None:
results[task_name][metric + " - filter=" + key + "_stderr"] = stderr(items) results[task_name][metric + " - filter=" + key + "_stderr"] = stderr(
items
)
return {"results": dict(results), "versions": dict(versions)} return {"results": dict(results), "versions": dict(versions)}
else:
return None
...@@ -9,6 +9,9 @@ from lm_eval import utils ...@@ -9,6 +9,9 @@ from lm_eval import utils
from lm_eval.logger import eval_logger from lm_eval.logger import eval_logger
from lm_eval.api.model import LM, register_model from lm_eval.api.model import LM, register_model
from accelerate import Accelerator
from itertools import islice
@register_model("hf-causal", "gpt2") @register_model("hf-causal", "gpt2")
class HFLM(LM): class HFLM(LM):
...@@ -28,19 +31,26 @@ class HFLM(LM): ...@@ -28,19 +31,26 @@ class HFLM(LM):
assert isinstance(pretrained, str) assert isinstance(pretrained, str)
assert isinstance(batch_size, int) assert isinstance(batch_size, int)
gpus = torch.cuda.device_count()
if gpus <= 1:
if device: if device:
if device not in ["cuda", "cpu"]: if device not in ["cuda", "cpu"]:
device = int(device) device = int(device)
self._device = torch.device(device) self._device = torch.device(device)
eval_logger.info(f"Using device '{device}'") print(f"Using device '{device}'")
else: else:
eval_logger.warning("Device not specified") print("Device not specified")
eval_logger.info(f"Cuda Available? {torch.cuda.is_available()}") print(f"Cuda Available? {torch.cuda.is_available()}")
self._device = ( self._device = (
torch.device("cuda") torch.device("cuda")
if torch.cuda.is_available() if torch.cuda.is_available()
else torch.device("cpu") else torch.device("cpu")
) )
self._rank = 0
self._world_size = 1
else:
self._device = "cpu"
# TODO: update this to be less of a hack once subfolder is fixed in HF # TODO: update this to be less of a hack once subfolder is fixed in HF
revision = revision + ("/" + subfolder if subfolder is not None else "") revision = revision + ("/" + subfolder if subfolder is not None else "")
...@@ -60,10 +70,30 @@ class HFLM(LM): ...@@ -60,10 +70,30 @@ class HFLM(LM):
# multithreading and batching # multithreading and batching
self.batch_size_per_gpu = batch_size # todo: adaptive batch size self.batch_size_per_gpu = batch_size # todo: adaptive batch size
# TODO: fix multi-gpu # multigpu support with accelerate
# gpus = torch.cuda.device_count() if gpus > 1:
# if gpus > 1: accelerator = Accelerator()
# self.gpt2 = nn.DataParallel(self.gpt2) if gpus > accelerator.num_processes:
warning = (
"WARNING: The number of total system GPUs does not match the number of spawned processes. "
"If you would like to use data parallelism, please launch the script "
"with 'accelerate launch *script*'. "
f"Current run will proceed with {accelerator.num_processes} devices."
)
print(warning)
self._rank = accelerator.local_process_index
self._world_size = accelerator.num_processes
else:
self.gpt2 = accelerator.prepare(self.gpt2)
self._device = torch.device(f"cuda:{accelerator.local_process_index}")
self.accelerator = accelerator
if self.accelerator.is_local_main_process:
print(f"Using {gpus} devices with data parallelism")
self._rank = self.accelerator.local_process_index
self._world_size = self.accelerator.num_processes
@property @property
def eot_token_id(self): def eot_token_id(self):
...@@ -73,9 +103,17 @@ class HFLM(LM): ...@@ -73,9 +103,17 @@ class HFLM(LM):
@property @property
def max_length(self): def max_length(self):
try: try:
if hasattr(self, "accelerator"):
return self.accelerator.unwrap_model(self.gpt2).config.n_ctx
else:
return self.gpt2.config.n_ctx return self.gpt2.config.n_ctx
except AttributeError: except AttributeError:
# gptneoconfig doesn't have n_ctx apparently # gptneoconfig doesn't have n_ctx apparently
if hasattr(self, "accelerator"):
return self.accelerator.unwrap_model(
self.gpt2
).config.max_position_embeddings
else:
return self.gpt2.config.max_position_embeddings return self.gpt2.config.max_position_embeddings
@property @property
...@@ -84,14 +122,20 @@ class HFLM(LM): ...@@ -84,14 +122,20 @@ class HFLM(LM):
@property @property
def batch_size(self): def batch_size(self):
# TODO: fix multi-gpu return self.batch_size_per_gpu
return self.batch_size_per_gpu # * gpus
@property @property
def device(self): def device(self):
# TODO: fix multi-gpu
return self._device return self._device
@property
def rank(self):
return self._rank
@property
def world_size(self):
return self._world_size
def tok_encode(self, string: str): def tok_encode(self, string: str):
return self.tokenizer.encode(string, add_special_tokens=False) return self.tokenizer.encode(string, add_special_tokens=False)
...@@ -138,7 +182,7 @@ class HFLM(LM): ...@@ -138,7 +182,7 @@ class HFLM(LM):
# TODO: automatic batch size detection for vectorization # TODO: automatic batch size detection for vectorization
loglikelihoods = [] loglikelihoods = []
for (string,) in tqdm([req.args for req in requests]): for (string,) in tqdm([req.args for req in requests], disable=(self.rank != 0)):
rolling_token_windows = list( rolling_token_windows = list(
map( map(
utils.make_disjoint_window, utils.make_disjoint_window,
...@@ -155,10 +199,26 @@ class HFLM(LM): ...@@ -155,10 +199,26 @@ class HFLM(LM):
# TODO: extract out this call so it only gets called once and also somehow figure out partial caching for # TODO: extract out this call so it only gets called once and also somehow figure out partial caching for
# that # that
pad_amnt = 0
if self.world_size > 1:
# TODO: Comment on what we do here
mytensor = torch.tensor(len(rolling_token_windows), device=self.device)
gathered = (
self.accelerator.gather(mytensor).cpu().detach().numpy().tolist()
)
pad_amnt = max(gathered) - gathered[self.rank]
if pad_amnt > 0:
rolling_token_windows += pad_amnt * [rolling_token_windows[0]]
string_nll = self._loglikelihood_tokens( string_nll = self._loglikelihood_tokens(
rolling_token_windows, disable_tqdm=True rolling_token_windows, disable_tqdm=True
) )
if (self.world_size > 1) and (pad_amnt > 0):
string_nll = [x[0] for x in string_nll[:-pad_amnt]]
else:
# discard is_greedy # discard is_greedy
string_nll = [x[0] for x in string_nll] string_nll = [x[0] for x in string_nll]
...@@ -185,8 +245,10 @@ class HFLM(LM): ...@@ -185,8 +245,10 @@ class HFLM(LM):
# TODO: automatic (variable) batch size detection for vectorization # TODO: automatic (variable) batch size detection for vectorization
re_ord = utils.Reorderer(requests, _collate) re_ord = utils.Reorderer(requests, _collate)
for chunk in utils.chunks( for chunk in utils.chunks(
tqdm(re_ord.get_reordered(), disable=disable_tqdm), self.batch_size tqdm(re_ord.get_reordered(), disable=(disable_tqdm or (self.rank != 0))),
self.batch_size,
): ):
inps = [] inps = []
cont_toks_list = [] cont_toks_list = []
inplens = [] inplens = []
......
...@@ -13,6 +13,7 @@ from typing import List ...@@ -13,6 +13,7 @@ from typing import List
from omegaconf import OmegaConf from omegaconf import OmegaConf
from jinja2 import BaseLoader, Environment, StrictUndefined from jinja2 import BaseLoader, Environment, StrictUndefined
from itertools import islice
class ExitCodeError(Exception): class ExitCodeError(Exception):
...@@ -317,3 +318,12 @@ env = Environment(loader=BaseLoader, undefined=StrictUndefined) ...@@ -317,3 +318,12 @@ env = Environment(loader=BaseLoader, undefined=StrictUndefined)
def apply_template(template, doc): def apply_template(template, doc):
rtemplate = env.from_string(template) rtemplate = env.from_string(template)
return rtemplate.render(**doc) return rtemplate.render(**doc)
def create_iterator(raw_iterator, rank, world_size, limit=None):
"""
Method for creating a (potentially) sliced and limited
iterator from a raw document iterator. Used for splitting data
among ranks in multigpu setting or only pulling a sample of documents
"""
return islice(raw_iterator, rank, limit, world_size)
...@@ -54,7 +54,7 @@ def pattern_match(patterns, source_list): ...@@ -54,7 +54,7 @@ def pattern_match(patterns, source_list):
for pattern in patterns: for pattern in patterns:
for matching in fnmatch.filter(source_list, pattern): for matching in fnmatch.filter(source_list, pattern):
task_names.add(matching) task_names.add(matching)
return list(task_names) return sorted(list(task_names))
def main(): def main():
...@@ -96,7 +96,7 @@ def main(): ...@@ -96,7 +96,7 @@ def main():
decontamination_ngrams_path=args.decontamination_ngrams_path, decontamination_ngrams_path=args.decontamination_ngrams_path,
check_integrity=args.check_integrity, check_integrity=args.check_integrity,
) )
if results is not None:
dumped = json.dumps(results, indent=2) dumped = json.dumps(results, indent=2)
print(dumped) print(dumped)
......
...@@ -21,6 +21,7 @@ setuptools.setup( ...@@ -21,6 +21,7 @@ setuptools.setup(
], ],
python_requires=">=3.6", python_requires=">=3.6",
install_requires=[ install_requires=[
"accelerate>=0.18.0",
"datasets>=2.0.0", "datasets>=2.0.0",
"jsonlines", "jsonlines",
"numexpr", "numexpr",
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment