Unverified Commit 2f7ca414 authored by xiang song(charlie.song)'s avatar xiang song(charlie.song) Committed by GitHub
Browse files

[Bug fix] Use shared memory for grad sync when NCCL is not avaliable as...


[Bug fix] Use shared memory for grad sync when NCCL is not avaliable as PyTorch distributed backend. (#3034)

* Use shared memory for grad sync when NCCL is not avaliable as PyTorch distributed backend.

Fix small bugs and update unitests

* Fix bug

* update test

* update test

* Fix unitest

* Fix unitest

* Fix test

* Fix

* simple update
Co-authored-by: default avatarUbuntu <ubuntu@ip-172-31-24-212.ec2.internal>
parent 31f4483a
......@@ -70,7 +70,10 @@ class SparseGradOptimizer(abc.ABC):
else:
assert not self._device, \
"All gradients must be on the same device"
if self._device:
# distributed backend use nccl
if self._device and \
(not th.distributed.is_initialized() or th.distributed.get_backend() == 'nccl'):
# device is only set if the grads are on a GPU
self._comm_setup()
else:
......@@ -284,9 +287,11 @@ class SparseGradOptimizer(abc.ABC):
# The overall buffer cost will be smaller than three times
# the maximum memory requirement for sharing gradients.
buffer_size = 128 if idx_i.shape[0] < 128 else idx_i.shape[0] * 2
idx_shmem = create_shared_mem_array(idx_shmem_name, \
idx_shmem = create_shared_mem_array(
'{}_{}'.format(idx_shmem_name, buffer_size), \
(buffer_size,), idx_dtype)
grad_shmem = create_shared_mem_array(grad_shmem_name, \
grad_shmem = create_shared_mem_array(
'{}_{}'.format(grad_shmem_name, buffer_size), \
(buffer_size, grad_dim), grad_dtype)
self._shared_cache[emb_name][idx_shmem_name] = idx_shmem
self._shared_cache[emb_name][grad_shmem_name] = grad_shmem
......@@ -321,9 +326,11 @@ class SparseGradOptimizer(abc.ABC):
if idx_shmem_name not in self._shared_cache[emb_name] or \
self._shared_cache[emb_name][idx_shmem_name].shape[0] < size:
buffer_size = 128 if size < 128 else size * 2
idx_shmem = get_shared_mem_array(idx_shmem_name, \
idx_shmem = get_shared_mem_array(
'{}_{}'.format(idx_shmem_name, buffer_size), \
(buffer_size,), idx_dtype)
grad_shmem = get_shared_mem_array(grad_shmem_name, \
grad_shmem = get_shared_mem_array(
'{}_{}'.format(grad_shmem_name, buffer_size), \
(buffer_size, grad_dim), grad_dtype)
self._shared_cache[emb_name][idx_shmem_name] = idx_shmem
self._shared_cache[emb_name][grad_shmem_name] = grad_shmem
......@@ -424,10 +431,15 @@ class SparseAdagrad(SparseGradOptimizer):
emb_name = emb.name
if th.device(emb.emb_tensor.device) == th.device('cpu'):
# if our embedding is on the CPU, our state also has to be
if self._rank <= 0:
if self._rank < 0:
state = th.empty(
emb.weight.shape,
dtype=th.float32,
device=eth.device('cpu')).zero_()
elif self._rank == 0:
state = create_shared_mem_array(emb_name+'_state', \
emb.weight.shape, th.float32).zero_()
if self._rank == 0:
if self._world_size > 1:
emb.store.set(emb_name+'_opt', emb_name)
elif self._rank > 0:
......@@ -538,14 +550,27 @@ class SparseAdam(SparseGradOptimizer):
emb_name = emb.name
if th.device(emb.emb_tensor.device) == th.device('cpu'):
# if our embedding is on the CPU, our state also has to be
if self._rank <= 0:
if self._rank < 0:
state_step = th.empty(
(emb.weight.shape[0],),
dtype=th.float32,
device=th.device('cpu')).zero_()
state_mem = th.empty(
emb.weight.shape,
dtype=th.float32,
device=th.device('cpu')).zero_()
state_power = th.empty(
emb.weight.shape,
dtype=th.float32,
device=th.device('cpu')).zero_()
elif self._rank == 0:
state_step = create_shared_mem_array(emb_name+'_step', \
(emb.weight.shape[0],), th.float32).zero_()
state_mem = create_shared_mem_array(emb_name+'_mem', \
emb.weight.shape, th.float32).zero_()
state_power = create_shared_mem_array(emb_name+'_power', \
emb.weight.shape, th.float32).zero_()
if self._rank == 0:
if self._world_size > 1:
emb.store.set(emb_name+'_opt', emb_name)
elif self._rank > 0:
......@@ -601,8 +626,8 @@ class SparseAdam(SparseGradOptimizer):
# only perform async copies cpu -> gpu, or gpu-> gpu, but block
# when copying to the cpu, so as to ensure the copy is finished
# before operating on the data on the cpu
state_nonblock = state_dev != th.device('cpu')
exec_nonblock = exec_dev != th.device('cpu')
state_nonblock = False # state_dev != th.device('cpu')
exec_nonblock = False # exec_dev != th.device('cpu')
# There can be duplicated indices due to sampling.
# Thus unique them here and average the gradient here.
......
import time
import multiprocessing as mp
import torch.multiprocessing as mp
import unittest, os
import pytest
......@@ -87,102 +87,321 @@ def initializer(emb):
emb.uniform_(-1.0, 1.0)
return emb
def start_sparse_adam_worker(rank, world_size, has_zero_grad=False, num_embs=128, emb_dim=10):
def start_sparse_adam_worker(rank, device, world_size, weight, tensor_dev='cpu', has_zero_grad=False,
backend='gloo', num_embs=128, emb_dim=10):
print('start sparse worker for adam {}'.format(rank))
dist_init_method = 'tcp://{master_ip}:{master_port}'.format(
master_ip='127.0.0.1', master_port='12345')
backend = 'gloo'
device=F.ctx()
if device.type == 'cuda':
th.cuda.set_device(device)
th.distributed.init_process_group(backend=backend,
init_method=dist_init_method,
world_size=world_size,
rank=rank)
init_weight = th.empty((num_embs, emb_dim))
th.manual_seed(0)
th.nn.init.uniform_(init_weight, -1.0, 1.0)
dgl_emb = NodeEmbedding(num_embs, emb_dim, 'test', init_func=initializer, device=tensor_dev)
dgl_emb.all_set_embedding(init_weight)
if has_zero_grad:
dgl_emb_zero = NodeEmbedding(num_embs, emb_dim, 'zero', init_func=initializer, device=tensor_dev)
dgl_adam = SparseAdam(params=[dgl_emb, dgl_emb_zero], lr=0.01)
else:
dgl_adam = SparseAdam(params=[dgl_emb], lr=0.01)
start = (num_embs // world_size) * rank
end = (num_embs // world_size) * (rank + 1)
th.manual_seed(rank)
idx = th.randint(start, end, size=(4,)).to(tensor_dev)
dgl_value = dgl_emb(idx, device)
labels = th.ones((4,)).long().to(device)
dgl_loss = th.nn.functional.cross_entropy(dgl_value, labels)
dgl_adam.zero_grad()
dgl_loss.backward()
dgl_adam.step()
th.distributed.barrier()
dgl_weight = dgl_emb.all_get_embedding().detach()
after_step = dgl_emb(idx, device).cpu()
if rank == 0:
dgl_value = dgl_value.detach().cpu()
assert F.allclose(dgl_value, after_step) is False
weight[:] = dgl_weight[:]
th.distributed.barrier()
def start_torch_adam_worker(rank, world_size, weight, has_zero_grad=False,
num_embs=128, emb_dim=10):
print('start sparse worker for adam {}'.format(rank))
dist_init_method = 'tcp://{master_ip}:{master_port}'.format(
master_ip='127.0.0.1', master_port='12345')
backend='gloo'
th.distributed.init_process_group(backend=backend,
init_method=dist_init_method,
world_size=world_size,
rank=rank)
dgl_emb = NodeEmbedding(num_embs, emb_dim, 'test', init_func=initializer)
torch_emb = th.nn.Embedding(num_embs, emb_dim, sparse=True)
th.manual_seed(0)
th.nn.init.uniform_(torch_emb.weight, -1.0, 1.0)
torch_emb = th.nn.parallel.DistributedDataParallel(torch_emb)
if has_zero_grad:
dgl_emb_zero = NodeEmbedding(num_embs, emb_dim, 'zero', init_func=initializer)
torch_emb_zero = th.nn.Embedding(num_embs, emb_dim, sparse=True)
torch_emb_zero = torch_emb_zero.to(tensor_dev)
th.manual_seed(0)
th.nn.init.uniform_(torch_emb_zero.weight, -1.0, 1.0)
torch_emb_zero = th.nn.parallel.DistributedDataParallel(torch_emb_zero)
dgl_adam = SparseAdam(params=[dgl_emb, dgl_emb_zero], lr=0.01)
torch_adam = th.optim.SparseAdam(
list(torch_emb.module.parameters()) + list(torch_emb_zero.module.parameters()),
lr=0.01)
else:
dgl_adam = SparseAdam(params=[dgl_emb], lr=0.01)
torch_adam = th.optim.SparseAdam(list(torch_emb.module.parameters()), lr=0.01)
start = (num_embs // world_size) * rank
end = (num_embs // world_size) * (rank + 1)
th.manual_seed(rank)
idx = th.randint(start, end, size=(4,))
dgl_value = dgl_emb(idx, device).to(th.device('cpu'))
torch_value = torch_emb(idx)
labels = th.ones((4,)).long()
dgl_adam.zero_grad()
dgl_loss = th.nn.functional.cross_entropy(dgl_value, labels)
dgl_loss.backward()
dgl_adam.step()
torch_value = torch_emb(idx)
torch_loss = th.nn.functional.cross_entropy(torch_value, labels)
torch_adam.zero_grad()
torch_loss.backward()
torch_adam.step()
th.distributed.barrier()
if rank == 0:
after_step = dgl_emb(idx, device)
assert F.allclose(dgl_emb.weight, torch_emb.module.weight)
assert F.allclose(dgl_value, after_step) is False
weight[:] = torch_emb.module.weight.cpu()[:]
th.distributed.barrier()
@unittest.skipIf(os.name == 'nt', reason='Do not support windows yet')
@unittest.skipIf(F.ctx().type == 'gpu', reason='cpu only test')
@pytest.mark.parametrize("num_workers", [2, 4])
def test_multiprocess_cpu_sparse_adam(num_workers):
backend = 'gloo'
worker_list = []
num_embs=128
emb_dim=10
dgl_weight = th.empty((num_embs, emb_dim))
ctx = mp.get_context('spawn')
for i in range(num_workers):
device = F.ctx()
p = ctx.Process(target=start_sparse_adam_worker,
args=(i, device, num_workers, dgl_weight, th.device('cpu'), True, backend))
p.start()
worker_list.append(p)
for p in worker_list:
p.join()
worker_list = []
torch_weight = th.empty((num_embs, emb_dim))
for i in range(num_workers):
p = ctx.Process(target=start_torch_adam_worker,
args=(i, num_workers, torch_weight, False))
p.start()
worker_list.append(p)
for p in worker_list:
p.join()
assert F.allclose(dgl_weight, torch_weight)
@unittest.skipIf(os.name == 'nt', reason='Do not support windows yet')
@unittest.skipIf(F.ctx().type == 'cpu', reason='gpu only test')
@pytest.mark.parametrize("num_workers", [2, 4, 8])
def test_multiprocess_sparse_adam(num_workers):
@pytest.mark.parametrize("backend", ['nccl', 'gloo'])
def test_multiprocess_sparse_adam(num_workers, backend):
if F.ctx().type == 'cuda' and th.cuda.device_count() < num_workers:
pytest.skip("Not enough GPUs to run test.")
worker_list = []
num_embs=128
emb_dim=10
dgl_weight = th.empty((num_embs, emb_dim))
ctx = mp.get_context('spawn')
for i in range(num_workers):
device = F.ctx()
if device.type == 'cuda':
# make sure each process has a unique GPU
device = th.device(i)
p = ctx.Process(target=start_sparse_adam_worker,
args=(i, device, num_workers, dgl_weight, th.device('cpu'), True, backend))
p.start()
worker_list.append(p)
for p in worker_list:
p.join()
worker_list = []
torch_weight = th.empty((num_embs, emb_dim))
for i in range(num_workers):
p = ctx.Process(target=start_torch_adam_worker,
args=(i, num_workers, torch_weight, False))
p.start()
worker_list.append(p)
for p in worker_list:
p.join()
assert F.allclose(dgl_weight, torch_weight)
@unittest.skipIf(os.name == 'nt', reason='Do not support windows yet')
@unittest.skipIf(F.ctx().type == 'cpu', reason='cuda tensor is not supported for cpu')
@pytest.mark.parametrize("num_workers", [2, 4, 8])
def test_multiprocess_sparse_adam_cuda_tensor(num_workers):
if F.ctx().type == 'cpu':
pytest.skip("Do not test CPU")
if F.ctx().type == 'cuda' and th.cuda.device_count() < num_workers:
pytest.skip("Not enough GPUs to run test.")
backend = 'nccl'
worker_list = []
num_embs=128
emb_dim=10
dgl_weight = th.empty((num_embs, emb_dim))
ctx = mp.get_context('spawn')
for i in range(num_workers):
device = th.device(i)
p = ctx.Process(target=start_sparse_adam_worker,
args=(i, num_workers))
args=(i, device, num_workers, dgl_weight, device, False, backend))
p.start()
worker_list.append(p)
for p in worker_list:
p.join()
worker_list = []
torch_weight = th.empty((num_embs, emb_dim))
for i in range(num_workers):
p = ctx.Process(target=start_torch_adam_worker,
args=(i, num_workers, torch_weight, False))
p.start()
worker_list.append(p)
for p in worker_list:
p.join()
assert F.allclose(dgl_weight, torch_weight)
@unittest.skipIf(os.name == 'nt', reason='Do not support windows yet')
@unittest.skipIf(F.ctx().type == 'gpu', reason='cpu only test')
@pytest.mark.parametrize("num_workers", [2, 4])
def test_multiprocess_sparse_adam_cpu_zero_step(num_workers):
backend = 'gloo'
worker_list = []
num_embs=128
emb_dim=10
dgl_weight = th.empty((num_embs, emb_dim))
ctx = mp.get_context('spawn')
for i in range(num_workers):
device = F.ctx()
p = ctx.Process(target=start_sparse_adam_worker,
args=(i, device, num_workers, dgl_weight, th.device('cpu'), True, backend))
p.start()
worker_list.append(p)
for p in worker_list:
p.join()
worker_list = []
torch_weight = th.empty((num_embs, emb_dim))
for i in range(num_workers):
p = ctx.Process(target=start_torch_adam_worker,
args=(i, num_workers, torch_weight, False))
p.start()
worker_list.append(p)
for p in worker_list:
p.join()
assert F.allclose(dgl_weight, torch_weight)
@unittest.skipIf(os.name == 'nt', reason='Do not support windows yet')
@unittest.skipIf(F.ctx().type == 'cpu', reason='gpu only test')
@pytest.mark.parametrize("num_workers", [2, 4, 8])
def test_multiprocess_sparse_adam_zero_step(num_workers):
@pytest.mark.parametrize("backend", ['nccl', 'gloo'])
def test_multiprocess_sparse_adam_zero_step(num_workers, backend):
if F.ctx().type == 'cuda' and th.cuda.device_count() < num_workers:
pytest.skip("Not enough GPUs to run test.")
worker_list = []
num_embs=128
emb_dim=10
dgl_weight = th.empty((num_embs, emb_dim))
ctx = mp.get_context('spawn')
for i in range(num_workers):
device = F.ctx()
if device.type == 'cuda':
# make sure each process has a unique GPU
device = th.device(i)
p = ctx.Process(target=start_sparse_adam_worker,
args=(i, device, num_workers, dgl_weight, th.device('cpu'), True, backend))
p.start()
worker_list.append(p)
for p in worker_list:
p.join()
worker_list = []
torch_weight = th.empty((num_embs, emb_dim))
for i in range(num_workers):
p = ctx.Process(target=start_torch_adam_worker,
args=(i, num_workers, torch_weight, False))
p.start()
worker_list.append(p)
for p in worker_list:
p.join()
assert F.allclose(dgl_weight, torch_weight)
@unittest.skipIf(os.name == 'nt', reason='Do not support windows yet')
@unittest.skipIf(F.ctx().type == 'cpu', reason='cuda tensor is not supported for cpu')
@pytest.mark.parametrize("num_workers", [2, 4, 8])
def test_multiprocess_sparse_adam_zero_step_cuda_tensor(num_workers):
if F.ctx().type == 'cuda' and th.cuda.device_count() < num_workers:
pytest.skip("Not enough GPUs to run test.")
backend = 'nccl'
worker_list = []
num_embs=128
emb_dim=10
dgl_weight = th.empty((num_embs, emb_dim))
ctx = mp.get_context('spawn')
for i in range(num_workers):
device = th.device(i)
p = ctx.Process(target=start_sparse_adam_worker,
args=(i, num_workers, True))
args=(i, device, num_workers, dgl_weight, device, True, backend))
p.start()
worker_list.append(p)
for p in worker_list:
p.join()
worker_list = []
torch_weight = th.empty((num_embs, emb_dim))
for i in range(num_workers):
p = ctx.Process(target=start_torch_adam_worker,
args=(i, num_workers, torch_weight, False))
p.start()
worker_list.append(p)
for p in worker_list:
p.join()
assert F.allclose(dgl_weight, torch_weight)
if __name__ == '__main__':
test_sparse_adam()
test_sparse_adam_zero_step()
test_multiprocess_sparse_adam(2)
test_multiprocess_sparse_adam(4)
test_multiprocess_cpu_sparse_adam(2)
test_multiprocess_cpu_sparse_adam(4)
test_multiprocess_cpu_sparse_adam(8)
test_multiprocess_sparse_adam_cpu_zero_step(2)
test_multiprocess_sparse_adam(2, backend='gloo')
test_multiprocess_sparse_adam(4, backend='gloo')
test_multiprocess_sparse_adam(8, backend='gloo')
test_multiprocess_sparse_adam(2, backend='nccl')
test_multiprocess_sparse_adam(4, backend='nccl')
test_multiprocess_sparse_adam(8, backend='nccl')
test_multiprocess_sparse_adam_zero_step(2, backend='gloo')
test_multiprocess_sparse_adam_zero_step(4, backend='nccl')
test_multiprocess_sparse_adam_zero_step(2)
test_multiprocess_sparse_adam_zero_step(4)
test_multiprocess_sparse_adam_cuda_tensor(2)
test_multiprocess_sparse_adam_zero_step_cuda_tensor(4)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment