Unverified Commit a842a927 authored by Yuanyuan (Ana) Shen's avatar Yuanyuan (Ana) Shen Committed by GitHub
Browse files

[feat] Add CPU support for pipe.py benchmarks (#188)

* Add CPU support for pipe.py benchmarks, CUDA-free
parent f80b303c
...@@ -283,7 +283,9 @@ def train(lm_dataloader, model, criterion, optimizer, vocab_size, args): ...@@ -283,7 +283,9 @@ def train(lm_dataloader, model, criterion, optimizer, vocab_size, args):
num_params = reduce(operator.add, (reduce(operator.mul, x.size()) for x in model.parameters())) num_params = reduce(operator.add, (reduce(operator.mul, x.size()) for x in model.parameters()))
if model.group: if model.group:
total = torch.Tensor([num_params]).cuda() total = torch.Tensor([num_params])
if torch.cuda.is_available():
total = total.cuda()
torch.distributed.all_reduce(total, group=model.group) torch.distributed.all_reduce(total, group=model.group)
logging.info( logging.info(
f"training model, #prams = {num_params}, group: {model.group.rank()}, grank:" f"training model, #prams = {num_params}, group: {model.group.rank()}, grank:"
...@@ -305,6 +307,8 @@ def train(lm_dataloader, model, criterion, optimizer, vocab_size, args): ...@@ -305,6 +307,8 @@ def train(lm_dataloader, model, criterion, optimizer, vocab_size, args):
if isinstance(model, DDP): if isinstance(model, DDP):
model = model.module model = model.module
if not torch.cuda.is_available():
return torch.device("cpu")
if model.devices: if model.devices:
return model.devices[0] return model.devices[0]
else: else:
...@@ -313,6 +317,9 @@ def train(lm_dataloader, model, criterion, optimizer, vocab_size, args): ...@@ -313,6 +317,9 @@ def train(lm_dataloader, model, criterion, optimizer, vocab_size, args):
def get_last_device(model): def get_last_device(model):
if isinstance(model, DDP): if isinstance(model, DDP):
model = model.module model = model.module
if not torch.cuda.is_available():
return torch.device("cpu")
if model.devices: if model.devices:
return model.devices[-1] return model.devices[-1]
else: else:
...@@ -491,8 +498,8 @@ def generate_balance(num_devices, num_layers): ...@@ -491,8 +498,8 @@ def generate_balance(num_devices, num_layers):
def make_model_and_data(args, device, new_data: bool = True): def make_model_and_data(args, device, new_data: bool = True):
device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")
if new_data: if new_data:
device = torch.device("cuda")
vocab_size = 10000 vocab_size = 10000
model, criterion, optimizer, scaler = make_model(args, device, vocab_size) model, criterion, optimizer, scaler = make_model(args, device, vocab_size)
lm_dataset = BenchmarkLMDataset() lm_dataset = BenchmarkLMDataset()
...@@ -507,7 +514,6 @@ def make_model_and_data(args, device, new_data: bool = True): ...@@ -507,7 +514,6 @@ def make_model_and_data(args, device, new_data: bool = True):
"vocab_size": vocab_size, "vocab_size": vocab_size,
} }
else: else:
device = torch.device("cuda")
data = get_data(device) data = get_data(device)
ntokens, train_data, val_data, test_data = data ntokens, train_data, val_data, test_data = data
model, criterion, optimizer, scaler = make_model(args, device, ntokens) model, criterion, optimizer, scaler = make_model(args, device, ntokens)
...@@ -520,10 +526,10 @@ def make_model_and_data(args, device, new_data: bool = True): ...@@ -520,10 +526,10 @@ def make_model_and_data(args, device, new_data: bool = True):
def bench_single_process(args): def bench_single_process(args):
num_devices = torch.cuda.device_count() num_devices = torch.cuda.device_count() if torch.cuda.is_available() else 1
assert num_devices > 0 assert num_devices > 0
init_random_seed(0) init_random_seed(0)
device = torch.device("cuda") device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")
new_data = True new_data = True
...@@ -557,12 +563,13 @@ def run_mp_worker(args, available_workers): ...@@ -557,12 +563,13 @@ def run_mp_worker(args, available_workers):
style=Pipe.AsyncSchedule, style=Pipe.AsyncSchedule,
chunks=args.chunks, chunks=args.chunks,
worker_map=get_worker_map(), worker_map=get_worker_map(),
input_device=torch.cuda.current_device(), input_device=torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu"),
pipelined_backward=args.pipelined_backward, pipelined_backward=args.pipelined_backward,
checkpoint=args.checkpoint, checkpoint=args.checkpoint,
# loss_fn=blob["criterion"], # loss_fn=blob["criterion"],
).cuda() )
if torch.cuda.is_available():
p = p.cuda()
if args.all_at_once and p.pipeline: if args.all_at_once and p.pipeline:
print(f"running all at once") print(f"running all at once")
p.pipeline.all_at_once = True p.pipeline.all_at_once = True
...@@ -678,7 +685,8 @@ parser.set_defaults(pipelined_backward=True) ...@@ -678,7 +685,8 @@ parser.set_defaults(pipelined_backward=True)
if __name__ == "__main__": if __name__ == "__main__":
args = parser.parse_args() args = parser.parse_args()
# bench_multi_process(args, all_at_once=True) bench_multi_process(args, all_at_once=True)
if args.no_mpi or "OMPI_COMM_WORLD_RANK" not in os.environ: if args.no_mpi or "OMPI_COMM_WORLD_RANK" not in os.environ:
print(f"Running benchmark with args: {args}") print(f"Running benchmark with args: {args}")
bench_single_process(args) bench_single_process(args)
......
...@@ -736,7 +736,7 @@ class Pipe(Module): ...@@ -736,7 +736,7 @@ class Pipe(Module):
from .phony import get_phony from .phony import get_phony
phony = get_phony(torch.device(torch.cuda.current_device()), requires_grad=True) phony = get_phony(torch.device(torch.cuda.current_device() if torch.cuda.is_available() else "cpu"), requires_grad=True)
output = PipelinedBackwardPass.apply(output, batches, phony, True) # self.retain_graph) output = PipelinedBackwardPass.apply(output, batches, phony, True) # self.retain_graph)
else: else:
output = microbatch.gather(batches) output = microbatch.gather(batches)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment