Unverified Commit 80c26877 authored by xiang song(charlie.song)'s avatar xiang song(charlie.song) Committed by GitHub
Browse files

[Bug Fix] Fix munmap bug in sparse optimizer (#2675)



* Fix munmap bug

* lint

* update
Co-authored-by: default avatarUbuntu <ubuntu@ip-172-31-56-220.ec2.internal>
parent 16e324da
"""Node embedding optimizers""" """Node embedding optimizers"""
import abc import abc
from abc import abstractmethod from abc import abstractmethod
import gc
import torch as th import torch as th
from ...utils import get_shared_mem_array, create_shared_mem_array from ...utils import get_shared_mem_array, create_shared_mem_array
...@@ -27,6 +26,9 @@ class SparseGradOptimizer(abc.ABC): ...@@ -27,6 +26,9 @@ class SparseGradOptimizer(abc.ABC):
self._shared_cache = {} self._shared_cache = {}
self._clean_grad = False self._clean_grad = False
self._opt_meta = {} self._opt_meta = {}
# hold released shared memory to let other process to munmap it first
# otherwise it will crash the training
self.shmem_buffer_holder = []
for emb in params: for emb in params:
assert isinstance(emb, NodeEmbedding), \ assert isinstance(emb, NodeEmbedding), \
...@@ -65,10 +67,6 @@ class SparseGradOptimizer(abc.ABC): ...@@ -65,10 +67,6 @@ class SparseGradOptimizer(abc.ABC):
# We cache shared memory buffers in shared_emb. # We cache shared memory buffers in shared_emb.
shared_emb = {emb.name: ([], []) for emb in self._params} shared_emb = {emb.name: ([], []) for emb in self._params}
# hold released shared memory to let other process to munmap it first
# unless it will crash the training
shmem_ptr_holder = []
# Go through all sparse embeddings # Go through all sparse embeddings
for emb in self._params: # pylint: disable=too-many-nested-blocks for emb in self._params: # pylint: disable=too-many-nested-blocks
emb_name = emb.name emb_name = emb.name
...@@ -130,16 +128,23 @@ class SparseGradOptimizer(abc.ABC): ...@@ -130,16 +128,23 @@ class SparseGradOptimizer(abc.ABC):
< idx_i.shape[0]: < idx_i.shape[0]:
if idx_shmem_name in self._shared_cache[emb_name]: if idx_shmem_name in self._shared_cache[emb_name]:
shmem_ptr_holder.append( self.shmem_buffer_holder.append(
self._shared_cache[emb_name][idx_shmem_name]) self._shared_cache[emb_name][idx_shmem_name])
shmem_ptr_holder.append( self.shmem_buffer_holder.append(
self._shared_cache[emb_name][grad_shmem_name]) self._shared_cache[emb_name][grad_shmem_name])
# in case idx_i.shape[0] is 0 # The total number of buffers is the number of NodeEmbeddings *
# world_size * (world_size - 1). The minimun buffer size is 128.
#
# We extend the buffer by idx_i.shape[0] * 2 to avoid
# frequent shared memory allocation.
# The overall buffer cost will be smaller than three times
# the maximum memory requirement for sharing gradients.
buffer_size = 128 if idx_i.shape[0] < 128 else idx_i.shape[0] * 2
idx_shmem = create_shared_mem_array(idx_shmem_name, \ idx_shmem = create_shared_mem_array(idx_shmem_name, \
(idx_i.shape[0] * 2 + 2,), idx_dtype) (buffer_size,), idx_dtype)
grad_shmem = create_shared_mem_array(grad_shmem_name, \ grad_shmem = create_shared_mem_array(grad_shmem_name, \
(idx_i.shape[0] * 2 + 2, grad_dim), grad_dtype) (buffer_size, grad_dim), grad_dtype)
self._shared_cache[emb_name][idx_shmem_name] = idx_shmem self._shared_cache[emb_name][idx_shmem_name] = idx_shmem
self._shared_cache[emb_name][grad_shmem_name] = grad_shmem self._shared_cache[emb_name][grad_shmem_name] = grad_shmem
...@@ -170,16 +175,14 @@ class SparseGradOptimizer(abc.ABC): ...@@ -170,16 +175,14 @@ class SparseGradOptimizer(abc.ABC):
# tensor that is sent to current training process # tensor that is sent to current training process
if idx_shmem_name not in self._shared_cache[emb_name] or \ if idx_shmem_name not in self._shared_cache[emb_name] or \
self._shared_cache[emb_name][idx_shmem_name].shape[0] < size: self._shared_cache[emb_name][idx_shmem_name].shape[0] < size:
buffer_size = 128 if size < 128 else size * 2
idx_shmem = get_shared_mem_array(idx_shmem_name, \ idx_shmem = get_shared_mem_array(idx_shmem_name, \
(size * 2 + 2,), idx_dtype) (buffer_size,), idx_dtype)
grad_shmem = get_shared_mem_array(grad_shmem_name, \ grad_shmem = get_shared_mem_array(grad_shmem_name, \
(size * 2 + 2, grad_dim), grad_dtype) (buffer_size, grad_dim), grad_dtype)
self._shared_cache[emb_name][idx_shmem_name] = idx_shmem self._shared_cache[emb_name][idx_shmem_name] = idx_shmem
self._shared_cache[emb_name][grad_shmem_name] = grad_shmem self._shared_cache[emb_name][grad_shmem_name] = grad_shmem
# make sure shared memory are released in child process first
# This will not be called frequently
# TODO(xiangsx) Provide API to mumap shared memory directly
gc.collect()
idx_i = self._shared_cache[emb_name][idx_shmem_name][:size] idx_i = self._shared_cache[emb_name][idx_shmem_name][:size]
grad_i = self._shared_cache[emb_name][grad_shmem_name][:size] grad_i = self._shared_cache[emb_name][grad_shmem_name][:size]
shared_emb[emb_name][0].append(idx_i.to(device, shared_emb[emb_name][0].append(idx_i.to(device,
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment