"...git@developer.sourcefind.cn:renzhc/diffusers_dcu.git" did not exist on "168e5b7ffa4949fca82ed2fcd17d3451c5804401"
Unverified Commit c997434c authored by Xinyu Yao's avatar Xinyu Yao Committed by GitHub
Browse files

[GraphBolt][CUDA] Update `copy_to`. (#7332)


Co-authored-by: default avatarUbuntu <ubuntu@ip-172-31-0-133.us-west-2.compute.internal>
parent 324fd976
...@@ -133,7 +133,7 @@ def create_dataloader( ...@@ -133,7 +133,7 @@ def create_dataloader(
# A CopyTo object copying data in the datapipe to a specified device.\ # A CopyTo object copying data in the datapipe to a specified device.\
############################################################################ ############################################################################
if args.storage_device != "cpu": if args.storage_device != "cpu":
datapipe = datapipe.copy_to(device, extra_attrs=["seed_nodes"]) datapipe = datapipe.copy_to(device)
datapipe = datapipe.sample_neighbor(graph, args.fanout) datapipe = datapipe.sample_neighbor(graph, args.fanout)
datapipe = datapipe.fetch_feature(features, node_feature_keys=["feat"]) datapipe = datapipe.fetch_feature(features, node_feature_keys=["feat"])
if args.storage_device == "cpu": if args.storage_device == "cpu":
......
...@@ -95,13 +95,12 @@ def create_dataloader( ...@@ -95,13 +95,12 @@ def create_dataloader(
# self.copy_to() # self.copy_to()
# [Input]: # [Input]:
# 'device': The device to copy the data to. # 'device': The device to copy the data to.
# 'extra_attrs': The extra attributes to copy.
# [Output]: # [Output]:
# A CopyTo object to copy the data to the specified device. Copying here # A CopyTo object to copy the data to the specified device. Copying here
# ensures that the rest of the operations run on the GPU. # ensures that the rest of the operations run on the GPU.
############################################################################ ############################################################################
if args.storage_device != "cpu": if args.storage_device != "cpu":
datapipe = datapipe.copy_to(device=device, extra_attrs=["seeds"]) datapipe = datapipe.copy_to(device=device)
############################################################################ ############################################################################
# [Step-3]: # [Step-3]:
......
...@@ -122,7 +122,7 @@ def create_dataloader( ...@@ -122,7 +122,7 @@ def create_dataloader(
graph, fanout if job != "infer" else [-1] graph, fanout if job != "infer" else [-1]
) )
# Copy the data to the specified device. # Copy the data to the specified device.
datapipe = datapipe.copy_to(device=device, extra_attrs=["input_nodes"]) datapipe = datapipe.copy_to(device=device)
# Fetch node features for the sampled subgraph. # Fetch node features for the sampled subgraph.
datapipe = datapipe.fetch_feature(feature, node_feature_keys=["feat"]) datapipe = datapipe.fetch_feature(feature, node_feature_keys=["feat"])
# Create and return a DataLoader to handle data loading. # Create and return a DataLoader to handle data loading.
......
...@@ -179,14 +179,14 @@ def create_dataloader( ...@@ -179,14 +179,14 @@ def create_dataloader(
) )
# Copy the data to the specified device. # Copy the data to the specified device.
if args.graph_device != "cpu": if args.graph_device != "cpu":
datapipe = datapipe.copy_to(device=device, extra_attrs=["seeds"]) datapipe = datapipe.copy_to(device=device)
# Sample neighbors for each node in the mini-batch. # Sample neighbors for each node in the mini-batch.
datapipe = getattr(datapipe, args.sample_mode)( datapipe = getattr(datapipe, args.sample_mode)(
graph, fanout if job != "infer" else [-1] graph, fanout if job != "infer" else [-1]
) )
# Copy the data to the specified device. # Copy the data to the specified device.
if args.feature_device != "cpu": if args.feature_device != "cpu":
datapipe = datapipe.copy_to(device=device, extra_attrs=["input_nodes"]) datapipe = datapipe.copy_to(device=device)
# Fetch node features for the sampled subgraph. # Fetch node features for the sampled subgraph.
datapipe = datapipe.fetch_feature(features, node_feature_keys=["feat"]) datapipe = datapipe.fetch_feature(features, node_feature_keys=["feat"])
# Copy the data to the specified device. # Copy the data to the specified device.
......
...@@ -18,7 +18,7 @@ def create_dataloader(dataset, itemset, device): ...@@ -18,7 +18,7 @@ def create_dataloader(dataset, itemset, device):
datapipe = gb.ItemSampler(itemset, batch_size=16) datapipe = gb.ItemSampler(itemset, batch_size=16)
# Copy the mini-batch to the designated device for sampling and training. # Copy the mini-batch to the designated device for sampling and training.
datapipe = datapipe.copy_to(device, extra_attrs=["seeds"]) datapipe = datapipe.copy_to(device)
# Sample neighbors for the seed nodes. # Sample neighbors for the seed nodes.
datapipe = datapipe.sample_neighbor(dataset.graph, fanouts=[4, 2]) datapipe = datapipe.sample_neighbor(dataset.graph, fanouts=[4, 2])
......
...@@ -117,7 +117,7 @@ def create_dataloader( ...@@ -117,7 +117,7 @@ def create_dataloader(
# Move the mini-batch to the appropriate device. # Move the mini-batch to the appropriate device.
# `device`: # `device`:
# The device to move the mini-batch to. # The device to move the mini-batch to.
datapipe = datapipe.copy_to(device, extra_attrs=["seeds"]) datapipe = datapipe.copy_to(device)
# Sample neighbors for each seed node in the mini-batch. # Sample neighbors for each seed node in the mini-batch.
# `graph`: # `graph`:
......
...@@ -140,7 +140,7 @@ ...@@ -140,7 +140,7 @@
" shuffle=is_train,\n", " shuffle=is_train,\n",
" drop_uneven_inputs=is_train,\n", " drop_uneven_inputs=is_train,\n",
" )\n", " )\n",
" datapipe = datapipe.copy_to(device, extra_attrs=[\"seed_nodes\"])\n", " datapipe = datapipe.copy_to(device)\n",
" # Now that we have moved to device, sample_neighbor and fetch_feature steps\n", " # Now that we have moved to device, sample_neighbor and fetch_feature steps\n",
" # will be executed on GPUs.\n", " # will be executed on GPUs.\n",
" datapipe = datapipe.sample_neighbor(graph, [10, 10, 10])\n", " datapipe = datapipe.sample_neighbor(graph, [10, 10, 10])\n",
......
...@@ -143,7 +143,7 @@ ...@@ -143,7 +143,7 @@
"source": [ "source": [
"def create_dataloader(itemset, shuffle):\n", "def create_dataloader(itemset, shuffle):\n",
" datapipe = gb.ItemSampler(itemset, batch_size=1024, shuffle=shuffle)\n", " datapipe = gb.ItemSampler(itemset, batch_size=1024, shuffle=shuffle)\n",
" datapipe = datapipe.copy_to(device, extra_attrs=[\"seeds\"])\n", " datapipe = datapipe.copy_to(device)\n",
" datapipe = datapipe.sample_neighbor(graph, [4, 4])\n", " datapipe = datapipe.sample_neighbor(graph, [4, 4])\n",
" datapipe = datapipe.fetch_feature(feature, node_feature_keys=[\"feat\"])\n", " datapipe = datapipe.fetch_feature(feature, node_feature_keys=[\"feat\"])\n",
" return gb.DataLoader(datapipe)" " return gb.DataLoader(datapipe)"
......
...@@ -195,8 +195,7 @@ def apply_to(x, device): ...@@ -195,8 +195,7 @@ def apply_to(x, device):
class CopyTo(IterDataPipe): class CopyTo(IterDataPipe):
"""DataPipe that transfers each element yielded from the previous DataPipe """DataPipe that transfers each element yielded from the previous DataPipe
to the given device. For MiniBatch, only the related attributes to the given device. For MiniBatch, only the related attributes
(automatically inferred) will be transferred by default. If you want to (automatically inferred) will be transferred by default.
transfer any other attributes, indicate them in the ``extra_attrs``.
Functional name: :obj:`copy_to`. Functional name: :obj:`copy_to`.
...@@ -208,64 +207,22 @@ class CopyTo(IterDataPipe): ...@@ -208,64 +207,22 @@ class CopyTo(IterDataPipe):
for data in datapipe: for data in datapipe:
yield data.to(device) yield data.to(device)
For :class:`~dgl.graphbolt.MiniBatch`, only a part of attributes will be
transferred to accelerate the process by default:
- When ``seed_nodes`` is not None and ``node_pairs`` is None, node related
task is inferred. Only ``labels``, ``sampled_subgraphs``, ``node_features``
and ``edge_features`` will be transferred.
- When ``node_pairs`` is not None and ``seed_nodes`` is None, edge/link
related task is inferred. Only ``labels``, ``compacted_node_pairs``,
``compacted_negative_srcs``, ``compacted_negative_dsts``,
``sampled_subgraphs``, ``node_features`` and ``edge_features`` will be
transferred.
- When ``seeds`` is not None, only ``labels``, ``compacted_seeds``,
``sampled_subgraphs``, ``node_features`` and ``edge_features`` will be
transferred.
- Otherwise, all attributes will be transferred.
- If you want some other attributes to be transferred as well, please
specify the name in the ``extra_attrs``. For instance, the following code
will copy ``seed_nodes`` to the GPU as well:
.. code:: python
datapipe = datapipe.copy_to(device="cuda", extra_attrs=["seed_nodes"])
Parameters Parameters
---------- ----------
datapipe : DataPipe datapipe : DataPipe
The DataPipe. The DataPipe.
device : torch.device device : torch.device
The PyTorch CUDA device. The PyTorch CUDA device.
extra_attrs: List[string]
The extra attributes of the data in the DataPipe you want to be carried
to the specific device. The attributes specified in the ``extra_attrs``
will be transferred regardless of the task inferred. It could also be
applied to classes other than :class:`~dgl.graphbolt.MiniBatch`.
""" """
def __init__(self, datapipe, device, extra_attrs=None): def __init__(self, datapipe, device):
super().__init__() super().__init__()
self.datapipe = datapipe self.datapipe = datapipe
self.device = device self.device = device
self.extra_attrs = extra_attrs
def __iter__(self): def __iter__(self):
for data in self.datapipe: for data in self.datapipe:
data = recursive_apply(data, apply_to, self.device) data = recursive_apply(data, apply_to, self.device)
if self.extra_attrs is not None:
for attr in self.extra_attrs:
setattr(
data,
attr,
recursive_apply(
getattr(data, attr), apply_to, self.device
),
)
yield data yield data
......
...@@ -36,14 +36,13 @@ def test_CopyTo(): ...@@ -36,14 +36,13 @@ def test_CopyTo():
"node_inference", "node_inference",
"link_prediction", "link_prediction",
"edge_classification", "edge_classification",
"extra_attrs",
], ],
) )
@unittest.skipIf(F._default_context_str == "cpu", "CopyTo needs GPU to test") @unittest.skipIf(F._default_context_str == "cpu", "CopyTo needs GPU to test")
def test_CopyToWithMiniBatches(task): def test_CopyToWithMiniBatches(task):
N = 16 N = 16
B = 2 B = 2
if task == "node_classification" or task == "extra_attrs": if task == "node_classification":
itemset = gb.ItemSet( itemset = gb.ItemSet(
(torch.arange(N), torch.arange(N)), names=("seeds", "labels") (torch.arange(N), torch.arange(N)), names=("seeds", "labels")
) )
...@@ -114,16 +113,11 @@ def test_CopyToWithMiniBatches(task): ...@@ -114,16 +113,11 @@ def test_CopyToWithMiniBatches(task):
else: else:
assert var.device.type == "cpu", attr assert var.device.type == "cpu", attr
if task == "extra_attrs":
extra_attrs = ["seed_nodes"]
else:
extra_attrs = None
# Invoke CopyTo via class constructor. # Invoke CopyTo via class constructor.
test_data_device(gb.CopyTo(datapipe, "cuda", extra_attrs)) test_data_device(gb.CopyTo(datapipe, "cuda"))
# Invoke CopyTo via functional form. # Invoke CopyTo via functional form.
test_data_device(datapipe.copy_to("cuda", extra_attrs)) test_data_device(datapipe.copy_to("cuda"))
def test_etype_tuple_to_str(): def test_etype_tuple_to_str():
......
...@@ -15,7 +15,7 @@ from . import gb_test_utils ...@@ -15,7 +15,7 @@ from . import gb_test_utils
def test_DataLoader(): def test_DataLoader():
N = 40 N = 40
B = 4 B = 4
itemset = dgl.graphbolt.ItemSet(torch.arange(N), names="seed_nodes") itemset = dgl.graphbolt.ItemSet(torch.arange(N), names="seeds")
graph = gb_test_utils.rand_csc_graph(200, 0.15, bidirection_edge=True) graph = gb_test_utils.rand_csc_graph(200, 0.15, bidirection_edge=True)
features = {} features = {}
keys = [("node", None, "a"), ("node", None, "b")] keys = [("node", None, "a"), ("node", None, "b")]
...@@ -62,7 +62,7 @@ def test_gpu_sampling_DataLoader( ...@@ -62,7 +62,7 @@ def test_gpu_sampling_DataLoader(
N = 40 N = 40
B = 4 B = 4
num_layers = 2 num_layers = 2
itemset = dgl.graphbolt.ItemSet(torch.arange(N), names="seed_nodes") itemset = dgl.graphbolt.ItemSet(torch.arange(N), names="seeds")
graph = gb_test_utils.rand_csc_graph(200, 0.15, bidirection_edge=True).to( graph = gb_test_utils.rand_csc_graph(200, 0.15, bidirection_edge=True).to(
F.ctx() F.ctx()
) )
...@@ -77,7 +77,7 @@ def test_gpu_sampling_DataLoader( ...@@ -77,7 +77,7 @@ def test_gpu_sampling_DataLoader(
feature_store = dgl.graphbolt.BasicFeatureStore(features) feature_store = dgl.graphbolt.BasicFeatureStore(features)
datapipe = dgl.graphbolt.ItemSampler(itemset, batch_size=B) datapipe = dgl.graphbolt.ItemSampler(itemset, batch_size=B)
datapipe = datapipe.copy_to(F.ctx(), extra_attrs=["seed_nodes"]) datapipe = datapipe.copy_to(F.ctx())
datapipe = getattr(dgl.graphbolt, sampler_name)( datapipe = getattr(dgl.graphbolt, sampler_name)(
datapipe, datapipe,
graph, graph,
......
...@@ -108,7 +108,7 @@ def create_dataloader( ...@@ -108,7 +108,7 @@ def create_dataloader(
shuffle=is_train, shuffle=is_train,
drop_uneven_inputs=is_train, drop_uneven_inputs=is_train,
) )
datapipe = datapipe.copy_to(device, extra_attrs=["seed_nodes"]) datapipe = datapipe.copy_to(device)
# Now that we have moved to device, sample_neighbor and fetch_feature steps # Now that we have moved to device, sample_neighbor and fetch_feature steps
# will be executed on GPUs. # will be executed on GPUs.
datapipe = datapipe.sample_neighbor(graph, [10, 10, 10]) datapipe = datapipe.sample_neighbor(graph, [10, 10, 10])
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment