Unverified Commit c864c910 authored by Muhammed Fatih BALIN's avatar Muhammed Fatih BALIN Committed by GitHub
Browse files

[GraphBolt][CUDA] Inplace pin memory for Graph and TorchFeatureStore (#6962)

parent 053c8221
...@@ -34,6 +34,17 @@ class FusedCSCSamplingGraph(SamplingGraph): ...@@ -34,6 +34,17 @@ class FusedCSCSamplingGraph(SamplingGraph):
): ):
super().__init__() super().__init__()
self._c_csc_graph = c_csc_graph self._c_csc_graph = c_csc_graph
self._is_inplace_pinned = set()
def __del__(self):
# torch.Tensor.pin_memory() is not an inplace operation. To make it
# truly in-place, we need to use cudaHostRegister. Then, we need to use
# cudaHostUnregister to unpin the tensor in the destructor.
# https://github.com/pytorch/pytorch/issues/32167#issuecomment-753551842
for tensor in self._is_inplace_pinned:
assert (
torch.cuda.cudart().cudaHostUnregister(tensor.data_ptr()) == 0
)
@property @property
def total_num_nodes(self) -> int: def total_num_nodes(self) -> int:
...@@ -974,9 +985,33 @@ class FusedCSCSamplingGraph(SamplingGraph): ...@@ -974,9 +985,33 @@ class FusedCSCSamplingGraph(SamplingGraph):
def pin_memory_(self): def pin_memory_(self):
"""Copy `FusedCSCSamplingGraph` to the pinned memory in-place.""" """Copy `FusedCSCSamplingGraph` to the pinned memory in-place."""
# torch.Tensor.pin_memory() is not an inplace operation. To make it
# truly in-place, we need to use cudaHostRegister. Then, we need to use
# cudaHostUnregister to unpin the tensor in the destructor.
# https://github.com/pytorch/pytorch/issues/32167#issuecomment-753551842
cudart = torch.cuda.cudart()
def _pin(x): def _pin(x):
return x.pin_memory() if hasattr(x, "pin_memory") else x if hasattr(x, "pin_memory_"):
x.pin_memory_()
elif (
isinstance(x, torch.Tensor)
and not x.is_pinned()
and x.device.type == "cpu"
):
assert (
x.is_contiguous()
), "Tensor pinning is only supported for contiguous tensors."
assert (
cudart.cudaHostRegister(
x.data_ptr(), x.numel() * x.element_size(), 0
)
== 0
)
self._is_inplace_pinned.add(x)
return x
self._apply_to_members(_pin) self._apply_to_members(_pin)
......
...@@ -83,6 +83,17 @@ class TorchBasedFeature(Feature): ...@@ -83,6 +83,17 @@ class TorchBasedFeature(Feature):
# Make sure the tensor is contiguous. # Make sure the tensor is contiguous.
self._tensor = torch_feature.contiguous() self._tensor = torch_feature.contiguous()
self._metadata = metadata self._metadata = metadata
self._is_inplace_pinned = set()
def __del__(self):
# torch.Tensor.pin_memory() is not an inplace operation. To make it
# truly in-place, we need to use cudaHostRegister. Then, we need to use
# cudaHostUnregister to unpin the tensor in the destructor.
# https://github.com/pytorch/pytorch/issues/32167#issuecomment-753551842
for tensor in self._is_inplace_pinned:
assert (
torch.cuda.cudart().cudaHostUnregister(tensor.data_ptr()) == 0
)
def read(self, ids: torch.Tensor = None): def read(self, ids: torch.Tensor = None):
"""Read the feature by index. """Read the feature by index.
...@@ -169,14 +180,30 @@ class TorchBasedFeature(Feature): ...@@ -169,14 +180,30 @@ class TorchBasedFeature(Feature):
def pin_memory_(self): def pin_memory_(self):
"""In-place operation to copy the feature to pinned memory.""" """In-place operation to copy the feature to pinned memory."""
self._tensor = self._tensor.pin_memory() # torch.Tensor.pin_memory() is not an inplace operation. To make it
# truly in-place, we need to use cudaHostRegister. Then, we need to use
# cudaHostUnregister to unpin the tensor in the destructor.
# https://github.com/pytorch/pytorch/issues/32167#issuecomment-753551842
x = self._tensor
if not x.is_pinned() and x.device.type == "cpu":
assert (
x.is_contiguous()
), "Tensor pinning is only supported for contiguous tensors."
assert (
torch.cuda.cudart().cudaHostRegister(
x.data_ptr(), x.numel() * x.element_size(), 0
)
== 0
)
self._is_inplace_pinned.add(x)
def to(self, device): # pylint: disable=invalid-name def to(self, device): # pylint: disable=invalid-name
"""Copy `TorchBasedFeature` to the specified device.""" """Copy `TorchBasedFeature` to the specified device."""
# copy.copy is a shallow copy so it does not copy tensor memory. # copy.copy is a shallow copy so it does not copy tensor memory.
self2 = copy.copy(self) self2 = copy.copy(self)
if device == "pinned": if device == "pinned":
self2.pin_memory_() self2._tensor = self2._tensor.pin_memory()
else: else:
self2._tensor = self2._tensor.to(device) self2._tensor = self2._tensor.to(device)
return self2 return self2
......
...@@ -1601,10 +1601,14 @@ def test_csc_sampling_graph_to_device(device): ...@@ -1601,10 +1601,14 @@ def test_csc_sampling_graph_to_device(device):
def test_csc_sampling_graph_to_pinned_memory(): def test_csc_sampling_graph_to_pinned_memory():
# Construct FusedCSCSamplingGraph. # Construct FusedCSCSamplingGraph.
graph = create_fused_csc_sampling_graph() graph = create_fused_csc_sampling_graph()
ptr = graph.csc_indptr.data_ptr()
# Copy to pinned_memory in-place. # Copy to pinned_memory in-place.
graph.pin_memory_() graph.pin_memory_()
# Check if pinning is truly in-place.
assert graph.csc_indptr.data_ptr() == ptr
is_graph_on_device_type(graph, "cpu") is_graph_on_device_type(graph, "cpu")
is_graph_pinned(graph) is_graph_pinned(graph)
......
...@@ -221,6 +221,9 @@ def test_torch_based_pinned_feature(dtype, idtype, shape): ...@@ -221,6 +221,9 @@ def test_torch_based_pinned_feature(dtype, idtype, shape):
feature = gb.TorchBasedFeature(tensor) feature = gb.TorchBasedFeature(tensor)
feature.pin_memory_() feature.pin_memory_()
# Check if pinning is truly in-place.
assert feature._tensor.data_ptr() == tensor.data_ptr()
# Test read entire pinned feature, the result should be on cuda. # Test read entire pinned feature, the result should be on cuda.
assert torch.equal(feature.read(), test_tensor_cuda) assert torch.equal(feature.read(), test_tensor_cuda)
assert feature.read().is_cuda assert feature.read().is_cuda
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment