Unverified Commit edf2d526 authored by ndickson-nvidia's avatar ndickson-nvidia Committed by GitHub
Browse files

[Dist][Optim] Fixed race conditions in distributed SparseAdam and SparseAdagrad (#3971)

* * Fixed race condition bug in distributed/optim/pytorch/sparse_optim.py's SparseAdam::update, corresponding with the bug fixed in the non-distributed version in https://github.com/dmlc/dgl/pull/3013 , though using the newer Event-based approach from that corresponding function.  The race condition would often result in NaNs, like the previously fixed bug. https://github.com/dmlc/dgl/issues/2760



* * Fixed race condition bug in SparseAdagrad::update corresponding with the one fixed in SparseAdam::update in the previous commit.  Same info applies.

* * Fixed typo in all copies of a repeatedly-copied comment near bug fixed 3 commits ago, checking all implementations nearby for a corresponding bug.  (All of them appear to have been fixed as of 2 commits ago.)

* * Removed trailing whitespace
Co-authored-by: default avatarQuan (Andy) Gan <coin2028@hotmail.com>
Co-authored-by: default avatarRhett Ying <85214957+Rhett-Ying@users.noreply.github.com>
parent 6056ed62
...@@ -109,7 +109,7 @@ class ExternalEmbedding: ...@@ -109,7 +109,7 @@ class ExternalEmbedding:
def update(self, gpu_id=-1): def update(self, gpu_id=-1):
""" Update embeddings in a sparse manner """ Update embeddings in a sparse manner
Sparse embeddings are updated in mini batches. we maintains gradient states for Sparse embeddings are updated in mini batches. We maintain gradient states for
each embedding so they can be updated separately. each embedding so they can be updated separately.
Parameters Parameters
......
...@@ -214,7 +214,7 @@ class ExternalEmbedding: ...@@ -214,7 +214,7 @@ class ExternalEmbedding:
def update(self, gpu_id=-1): def update(self, gpu_id=-1):
""" Update embeddings in a sparse manner """ Update embeddings in a sparse manner
Sparse embeddings are updated in mini batches. we maintains gradient states for Sparse embeddings are updated in mini batches. We maintain gradient states for
each embedding so they can be updated separately. each embedding so they can be updated separately.
Parameters Parameters
......
...@@ -142,7 +142,7 @@ class DistSparseGradOptimizer(abc.ABC): ...@@ -142,7 +142,7 @@ class DistSparseGradOptimizer(abc.ABC):
@abstractmethod @abstractmethod
def update(self, idx, grad, emb): def update(self, idx, grad, emb):
""" Update embeddings in a sparse manner """ Update embeddings in a sparse manner
Sparse embeddings are updated in mini batches. we maintains gradient states for Sparse embeddings are updated in mini batches. We maintain gradient states for
each embedding so they can be updated separately. each embedding so they can be updated separately.
Parameters Parameters
...@@ -215,7 +215,7 @@ class SparseAdagrad(DistSparseGradOptimizer): ...@@ -215,7 +215,7 @@ class SparseAdagrad(DistSparseGradOptimizer):
def update(self, idx, grad, emb): def update(self, idx, grad, emb):
""" Update embeddings in a sparse manner """ Update embeddings in a sparse manner
Sparse embeddings are updated in mini batches. we maintains gradient states for Sparse embeddings are updated in mini batches. We maintain gradient states for
each embedding so they can be updated separately. each embedding so they can be updated separately.
Parameters Parameters
...@@ -229,8 +229,15 @@ class SparseAdagrad(DistSparseGradOptimizer): ...@@ -229,8 +229,15 @@ class SparseAdagrad(DistSparseGradOptimizer):
""" """
eps = self._eps eps = self._eps
clr = self._lr clr = self._lr
state_dev = th.device('cpu')
exec_dev = grad.device exec_dev = grad.device
# only perform async copies cpu -> gpu, or gpu-> gpu, but block
# when copying to the cpu, so as to ensure the copy is finished
# before operating on the data on the cpu
state_block = state_dev == th.device('cpu') and exec_dev != state_dev
# the update is non-linear so indices must be unique # the update is non-linear so indices must be unique
grad_indices, inverse, cnt = th.unique(idx, return_inverse=True, return_counts=True) grad_indices, inverse, cnt = th.unique(idx, return_inverse=True, return_counts=True)
grad_values = th.zeros((grad_indices.shape[0], grad.shape[1]), device=exec_dev) grad_values = th.zeros((grad_indices.shape[0], grad.shape[1]), device=exec_dev)
...@@ -239,14 +246,32 @@ class SparseAdagrad(DistSparseGradOptimizer): ...@@ -239,14 +246,32 @@ class SparseAdagrad(DistSparseGradOptimizer):
grad_sum = (grad_values * grad_values) grad_sum = (grad_values * grad_values)
# update grad state # update grad state
grad_state = self._state[emb.name][grad_indices].to(exec_dev, non_blocking=True) grad_state = self._state[emb.name][grad_indices].to(exec_dev)
grad_state += grad_sum grad_state += grad_sum
self._state[emb.name][grad_indices] = grad_state.to(th.device('cpu'), non_blocking=True) grad_state_dst = grad_state.to(state_dev, non_blocking=True)
if state_block:
# use events to try and overlap CPU and GPU as much as possible
update_event = th.cuda.Event()
update_event.record()
# update emb # update emb
std_values = grad_state.add_(eps).sqrt_() std_values = grad_state.add_(eps).sqrt_()
tmp = clr * grad_values / std_values tmp = clr * grad_values / std_values
emb._tensor[grad_indices] -= tmp.to(th.device('cpu'), non_blocking=True) tmp_dst = tmp.to(state_dev, non_blocking=True)
if state_block:
std_event = th.cuda.Event()
std_event.record()
# wait for our transfers from exec_dev to state_dev to finish
# before we can use them
update_event.wait()
self._state[emb.name][grad_indices] = grad_state_dst
if state_block:
# wait for the transfer of std_values to finish before we
# can use it
std_event.wait()
emb._tensor[grad_indices] -= tmp_dst
class SparseAdam(DistSparseGradOptimizer): class SparseAdam(DistSparseGradOptimizer):
r''' Distributed Node embedding optimizer using the Adam algorithm. r''' Distributed Node embedding optimizer using the Adam algorithm.
...@@ -312,7 +337,7 @@ class SparseAdam(DistSparseGradOptimizer): ...@@ -312,7 +337,7 @@ class SparseAdam(DistSparseGradOptimizer):
def update(self, idx, grad, emb): def update(self, idx, grad, emb):
""" Update embeddings in a sparse manner """ Update embeddings in a sparse manner
Sparse embeddings are updated in mini batches. we maintains gradient states for Sparse embeddings are updated in mini batches. We maintain gradient states for
each embedding so they can be updated separately. each embedding so they can be updated separately.
Parameters Parameters
...@@ -332,6 +357,12 @@ class SparseAdam(DistSparseGradOptimizer): ...@@ -332,6 +357,12 @@ class SparseAdam(DistSparseGradOptimizer):
state_dev = th.device('cpu') state_dev = th.device('cpu')
exec_dev = grad.device exec_dev = grad.device
# only perform async copies cpu -> gpu, or gpu-> gpu, but block
# when copying to the cpu, so as to ensure the copy is finished
# before operating on the data on the cpu
state_block = state_dev == th.device('cpu') and exec_dev != state_dev
# the update is non-linear so indices must be unique # the update is non-linear so indices must be unique
grad_indices, inverse, cnt = th.unique(idx, return_inverse=True, return_counts=True) grad_indices, inverse, cnt = th.unique(idx, return_inverse=True, return_counts=True)
# update grad state # update grad state
...@@ -347,9 +378,9 @@ class SparseAdam(DistSparseGradOptimizer): ...@@ -347,9 +378,9 @@ class SparseAdam(DistSparseGradOptimizer):
# value (i.e., 0 in the first iteration) which will cause update_power_corr to be NaN # value (i.e., 0 in the first iteration) which will cause update_power_corr to be NaN
state_val = state_step[state_idx] + 1 state_val = state_step[state_idx] + 1
state_step[state_idx] = state_val state_step[state_idx] = state_val
state_step = state_val.to(exec_dev, non_blocking=True) state_step = state_val.to(exec_dev)
orig_mem = state_mem[state_idx].to(exec_dev, non_blocking=True) orig_mem = state_mem[state_idx].to(exec_dev)
orig_power = state_power[state_idx].to(exec_dev, non_blocking=True) orig_power = state_power[state_idx].to(exec_dev)
grad_values = th.zeros((grad_indices.shape[0], grad.shape[1]), device=exec_dev) grad_values = th.zeros((grad_indices.shape[0], grad.shape[1]), device=exec_dev)
grad_values.index_add_(0, inverse, grad) grad_values.index_add_(0, inverse, grad)
...@@ -358,8 +389,12 @@ class SparseAdam(DistSparseGradOptimizer): ...@@ -358,8 +389,12 @@ class SparseAdam(DistSparseGradOptimizer):
grad_power = grad_values * grad_values grad_power = grad_values * grad_values
update_mem = beta1 * orig_mem + (1.-beta1) * grad_mem update_mem = beta1 * orig_mem + (1.-beta1) * grad_mem
update_power = beta2 * orig_power + (1.-beta2) * grad_power update_power = beta2 * orig_power + (1.-beta2) * grad_power
state_mem[state_idx] = update_mem.to(state_dev, non_blocking=True) update_mem_dst = update_mem.to(state_dev, non_blocking=True)
state_power[state_idx] = update_power.to(state_dev, non_blocking=True) update_power_dst = update_power.to(state_dev, non_blocking=True)
if state_block:
# use events to try and overlap CPU and GPU as much as possible
update_event = th.cuda.Event()
update_event.record()
update_mem_corr = update_mem / (1. - th.pow(th.tensor(beta1, device=exec_dev), update_mem_corr = update_mem / (1. - th.pow(th.tensor(beta1, device=exec_dev),
state_step)).unsqueeze(1) state_step)).unsqueeze(1)
...@@ -367,4 +402,19 @@ class SparseAdam(DistSparseGradOptimizer): ...@@ -367,4 +402,19 @@ class SparseAdam(DistSparseGradOptimizer):
state_step)).unsqueeze(1) state_step)).unsqueeze(1)
std_values = clr * update_mem_corr / (th.sqrt(update_power_corr) + eps) std_values = clr * update_mem_corr / (th.sqrt(update_power_corr) + eps)
emb._tensor[state_idx] -= std_values.to(state_dev) std_values_dst = std_values.to(state_dev, non_blocking=True)
if state_block:
std_event = th.cuda.Event()
std_event.record()
# wait for our transfers from exec_dev to state_dev to finish
# before we can use them
update_event.wait()
state_mem[state_idx] = update_mem_dst
state_power[state_idx] = update_power_dst
if state_block:
# wait for the transfer of std_values to finish before we
# can use it
std_event.wait()
emb._tensor[state_idx] -= std_values_dst
...@@ -361,7 +361,7 @@ class SparseGradOptimizer(abc.ABC): ...@@ -361,7 +361,7 @@ class SparseGradOptimizer(abc.ABC):
@abstractmethod @abstractmethod
def update(self, idx, grad, emb): def update(self, idx, grad, emb):
""" Update embeddings in a sparse manner """ Update embeddings in a sparse manner
Sparse embeddings are updated in mini batches. we maintains gradient states for Sparse embeddings are updated in mini batches. We maintain gradient states for
each embedding so they can be updated separately. each embedding so they can be updated separately.
Parameters Parameters
...@@ -456,7 +456,7 @@ class SparseAdagrad(SparseGradOptimizer): ...@@ -456,7 +456,7 @@ class SparseAdagrad(SparseGradOptimizer):
def update(self, idx, grad, emb): def update(self, idx, grad, emb):
""" Update embeddings in a sparse manner """ Update embeddings in a sparse manner
Sparse embeddings are updated in mini batches. we maintains gradient states for Sparse embeddings are updated in mini batches. We maintain gradient states for
each embedding so they can be updated separately. each embedding so they can be updated separately.
Parameters Parameters
...@@ -600,7 +600,7 @@ class SparseAdam(SparseGradOptimizer): ...@@ -600,7 +600,7 @@ class SparseAdam(SparseGradOptimizer):
def update(self, idx, grad, emb): def update(self, idx, grad, emb):
""" Update embeddings in a sparse manner """ Update embeddings in a sparse manner
Sparse embeddings are updated in mini batches. we maintains gradient states for Sparse embeddings are updated in mini batches. We maintain gradient states for
each embedding so they can be updated separately. each embedding so they can be updated separately.
Parameters Parameters
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment