Unverified Commit bd74c44c authored by Muhammed Fatih BALIN's avatar Muhammed Fatih BALIN Committed by GitHub
Browse files

[GraphBolt][CUDA] Add `.to()` method to Graph and FeatureStore. (#6957)

parent 6f9db813
...@@ -956,7 +956,21 @@ class FusedCSCSamplingGraph(SamplingGraph): ...@@ -956,7 +956,21 @@ class FusedCSCSamplingGraph(SamplingGraph):
def _to(x): def _to(x):
return x.to(device) if hasattr(x, "to") else x return x.to(device) if hasattr(x, "to") else x
return self._apply_to_members(_to) def _pin(x):
return x.pin_memory() if hasattr(x, "pin_memory") else x
# Create a copy of self.
self2 = fused_csc_sampling_graph(
self.csc_indptr,
self.indices,
self.node_type_offset,
self.type_per_edge,
self.node_type_to_id,
self.edge_type_to_id,
self.node_attributes,
self.edge_attributes,
)
return self2._apply_to_members(_pin if device == "pinned" else _to)
def pin_memory_(self): def pin_memory_(self):
"""Copy `FusedCSCSamplingGraph` to the pinned memory in-place.""" """Copy `FusedCSCSamplingGraph` to the pinned memory in-place."""
......
"""Torch-based feature store for GraphBolt.""" """Torch-based feature store for GraphBolt."""
import copy
import textwrap import textwrap
from typing import Dict, List from typing import Dict, List
...@@ -170,6 +171,16 @@ class TorchBasedFeature(Feature): ...@@ -170,6 +171,16 @@ class TorchBasedFeature(Feature):
"""In-place operation to copy the feature to pinned memory.""" """In-place operation to copy the feature to pinned memory."""
self._tensor = self._tensor.pin_memory() self._tensor = self._tensor.pin_memory()
def to(self, device): # pylint: disable=invalid-name
"""Copy `TorchBasedFeature` to the specified device."""
# copy.copy is a shallow copy so it does not copy tensor memory.
self2 = copy.copy(self)
if device == "pinned":
self2.pin_memory_()
else:
self2._tensor = self2._tensor.to(device)
return self2
def __repr__(self) -> str: def __repr__(self) -> str:
ret = ( ret = (
"{Classname}(\n" "{Classname}(\n"
...@@ -255,6 +266,13 @@ class TorchBasedFeatureStore(BasicFeatureStore): ...@@ -255,6 +266,13 @@ class TorchBasedFeatureStore(BasicFeatureStore):
for feature in self._features.values(): for feature in self._features.values():
feature.pin_memory_() feature.pin_memory_()
def to(self, device): # pylint: disable=invalid-name
"""Copy `TorchBasedFeatureStore` to the specified device."""
# copy.copy is a shallow copy so it does not copy tensor memory.
self2 = copy.copy(self)
self2._features = {k: v.to(device) for k, v in self2._features.items()}
return self2
def __repr__(self) -> str: def __repr__(self) -> str:
ret = "{Classname}(\n" + " {features}\n" + ")" ret = "{Classname}(\n" + " {features}\n" + ")"
features_str = textwrap.indent(str(self._features), " ").strip() features_str = textwrap.indent(str(self._features), " ").strip()
......
...@@ -1552,25 +1552,46 @@ def create_fused_csc_sampling_graph(): ...@@ -1552,25 +1552,46 @@ def create_fused_csc_sampling_graph():
) )
def is_graph_on_device_type(graph, device_type):
assert graph.csc_indptr.device.type == device_type
assert graph.indices.device.type == device_type
assert graph.node_type_offset.device.type == device_type
assert graph.type_per_edge.device.type == device_type
assert graph.csc_indptr.device.type == device_type
for key in graph.edge_attributes:
assert graph.edge_attributes[key].device.type == device_type
def is_graph_pinned(graph):
assert graph.csc_indptr.is_pinned()
assert graph.indices.is_pinned()
assert graph.node_type_offset.is_pinned()
assert graph.type_per_edge.is_pinned()
assert graph.csc_indptr.is_pinned()
for key in graph.edge_attributes:
assert graph.edge_attributes[key].is_pinned()
@unittest.skipIf( @unittest.skipIf(
F._default_context_str == "cpu", F._default_context_str == "cpu",
reason="`to` function needs GPU to test.", reason="`to` function needs GPU to test.",
) )
def test_csc_sampling_graph_to_device(): @pytest.mark.parametrize("device", ["pinned", "cuda"])
def test_csc_sampling_graph_to_device(device):
# Construct FusedCSCSamplingGraph. # Construct FusedCSCSamplingGraph.
graph = create_fused_csc_sampling_graph() graph = create_fused_csc_sampling_graph()
# Copy to device. # Copy to device.
graph = graph.to("cuda") graph2 = graph.to(device)
# Check. if device == "cuda":
assert graph.csc_indptr.device.type == "cuda" is_graph_on_device_type(graph2, "cuda")
assert graph.indices.device.type == "cuda" elif device == "pinned":
assert graph.node_type_offset.device.type == "cuda" is_graph_on_device_type(graph2, "cpu")
assert graph.type_per_edge.device.type == "cuda" is_graph_pinned(graph2)
assert graph.csc_indptr.device.type == "cuda"
for key in graph.edge_attributes: # The original variable should be untouched.
assert graph.edge_attributes[key].device.type == "cuda" is_graph_on_device_type(graph, "cpu")
@unittest.skipIf( @unittest.skipIf(
...@@ -1584,14 +1605,8 @@ def test_csc_sampling_graph_to_pinned_memory(): ...@@ -1584,14 +1605,8 @@ def test_csc_sampling_graph_to_pinned_memory():
# Copy to pinned_memory in-place. # Copy to pinned_memory in-place.
graph.pin_memory_() graph.pin_memory_()
# Check. is_graph_on_device_type(graph, "cpu")
assert graph.csc_indptr.is_pinned() is_graph_pinned(graph)
assert graph.indices.is_pinned()
assert graph.node_type_offset.is_pinned()
assert graph.type_per_edge.is_pinned()
assert graph.csc_indptr.is_pinned()
for key in graph.edge_attributes:
assert graph.edge_attributes[key].is_pinned()
@pytest.mark.parametrize("labor", [False, True]) @pytest.mark.parametrize("labor", [False, True])
......
...@@ -136,6 +136,59 @@ def test_torch_based_feature(in_memory): ...@@ -136,6 +136,59 @@ def test_torch_based_feature(in_memory):
feature_a = feature_b = None feature_a = feature_b = None
def is_feature_store_pinned(store):
for feature in store._features.values():
assert feature._tensor.is_pinned()
def is_feature_store_on_cuda(store):
for feature in store._features.values():
assert feature._tensor.is_cuda
def is_feature_store_on_cpu(store):
for feature in store._features.values():
assert not feature._tensor.is_cuda
@unittest.skipIf(
F._default_context_str == "cpu",
reason="Tests for pinned memory are only meaningful on GPU.",
)
@pytest.mark.parametrize("device", ["pinned", "cuda"])
def test_feature_store_to_device(device):
with tempfile.TemporaryDirectory() as test_dir:
a = torch.tensor([[1, 2, 4], [2, 5, 3]])
b = torch.tensor([[[1, 2], [3, 4]], [[2, 5], [3, 4]]])
write_tensor_to_disk(test_dir, "a", a, fmt="torch")
write_tensor_to_disk(test_dir, "b", b, fmt="numpy")
feature_data = [
gb.OnDiskFeatureData(
domain="node",
type="paper",
name="a",
format="torch",
path=os.path.join(test_dir, "a.pt"),
),
gb.OnDiskFeatureData(
domain="edge",
type="paper:cites:paper",
name="b",
format="numpy",
path=os.path.join(test_dir, "b.npy"),
),
]
feature_store = gb.TorchBasedFeatureStore(feature_data)
feature_store2 = feature_store.to(device)
if device == "pinned":
is_feature_store_pinned(feature_store2)
elif device == "cuda":
is_feature_store_on_cuda(feature_store2)
# The original variable should be untouched.
is_feature_store_on_cpu(feature_store)
@unittest.skipIf( @unittest.skipIf(
F._default_context_str == "cpu", F._default_context_str == "cpu",
reason="Tests for pinned memory are only meaningful on GPU.", reason="Tests for pinned memory are only meaningful on GPU.",
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment