Unverified Commit 836fbb00 authored by Rhett Ying's avatar Rhett Ying Committed by GitHub
Browse files

[GraphBolt] move to_dgl() from datapipe to dataloader iter (#6728)

parent b20455a2
...@@ -47,11 +47,11 @@ To use this sampler with :class:`~dgl.graphbolt.DataLoader`: ...@@ -47,11 +47,11 @@ To use this sampler with :class:`~dgl.graphbolt.DataLoader`:
datapipe = gb.ItemSampler(train_set, batch_size=1024, shuffle=True) datapipe = gb.ItemSampler(train_set, batch_size=1024, shuffle=True)
datapipe = datapipe.customized_sample_neighbor(g, [10, 10]) # 2 layers. datapipe = datapipe.customized_sample_neighbor(g, [10, 10]) # 2 layers.
datapipe = datapipe.fetch_feature(feature, node_feature_keys=["feat"]) datapipe = datapipe.fetch_feature(feature, node_feature_keys=["feat"])
datapipe = datapipe.to_dgl()
datapipe = datapipe.copy_to(device) datapipe = datapipe.copy_to(device)
dataloader = gb.DataLoader(datapipe, num_workers=0) dataloader = gb.DataLoader(datapipe, num_workers=0)
for data in dataloader: for data in dataloader:
data = data.to_dgl()
input_features = data.node_features["feat"] input_features = data.node_features["feat"]
output_labels = data.labels output_labels = data.labels
output_predictions = model(data.blocks, input_features) output_predictions = model(data.blocks, input_features)
...@@ -93,11 +93,11 @@ can be used on heterogeneous graphs: ...@@ -93,11 +93,11 @@ can be used on heterogeneous graphs:
datapipe = datapipe.fetch_feature( datapipe = datapipe.fetch_feature(
feature, node_feature_keys={"user": ["feat"], "item": ["feat"]} feature, node_feature_keys={"user": ["feat"], "item": ["feat"]}
) )
datapipe = datapipe.to_dgl()
datapipe = datapipe.copy_to(device) datapipe = datapipe.copy_to(device)
dataloader = gb.DataLoader(datapipe, num_workers=0) dataloader = gb.DataLoader(datapipe, num_workers=0)
for data in dataloader: for data in dataloader:
data = data.to_dgl()
input_features = { input_features = {
ntype: data.node_features[(ntype, "feat")] ntype: data.node_features[(ntype, "feat")]
for ntype in data.blocks[0].srctypes for ntype in data.blocks[0].srctypes
......
...@@ -38,16 +38,18 @@ edges(namely, node pairs) in the training set instead of the nodes. ...@@ -38,16 +38,18 @@ edges(namely, node pairs) in the training set instead of the nodes.
# Or equivalently: # Or equivalently:
# datapipe = gb.NeighborSampler(datapipe, g, [10, 10]) # datapipe = gb.NeighborSampler(datapipe, g, [10, 10])
datapipe = datapipe.fetch_feature(feature, node_feature_keys=["feat"]) datapipe = datapipe.fetch_feature(feature, node_feature_keys=["feat"])
datapipe = datapipe.to_dgl()
datapipe = datapipe.copy_to(device) datapipe = datapipe.copy_to(device)
dataloader = gb.DataLoader(datapipe, num_workers=0) dataloader = gb.DataLoader(datapipe, num_workers=0)
Iterating over the DataLoader will yield :class:`~dgl.graphbolt.DGLMiniBatch` Iterating over the DataLoader will yield :class:`~dgl.graphbolt.MiniBatch`
which contains a list of specially created graphs representing the computation which contains a list of specially created graphs representing the computation
dependencies on each layer. They are called *message flow graphs* (MFGs) in DGL. dependencies on each layer. In order to train with DGL, you need to convert them
to :class:`~dgl.graphbolt.DGLMiniBatch`. Then you can access the
*message flow graphs* (MFGs).
.. code:: python .. code:: python
mini_batch = next(iter(dataloader)) mini_batch = next(iter(dataloader))
mini_batch = mini_batch.to_dgl()
print(mini_batch.blocks) print(mini_batch.blocks)
.. note:: .. note::
...@@ -91,7 +93,6 @@ You can use :func:`~dgl.graphbolt.exclude_seed_edges` alongside with ...@@ -91,7 +93,6 @@ You can use :func:`~dgl.graphbolt.exclude_seed_edges` alongside with
exclude_seed_edges = partial(gb.exclude_seed_edges, include_reverse_edges=True) exclude_seed_edges = partial(gb.exclude_seed_edges, include_reverse_edges=True)
datapipe = datapipe.transform(exclude_seed_edges) datapipe = datapipe.transform(exclude_seed_edges)
datapipe = datapipe.fetch_feature(feature, node_feature_keys=["feat"]) datapipe = datapipe.fetch_feature(feature, node_feature_keys=["feat"])
datapipe = datapipe.to_dgl()
datapipe = datapipe.copy_to(device) datapipe = datapipe.copy_to(device)
dataloader = gb.DataLoader(datapipe, num_workers=0) dataloader = gb.DataLoader(datapipe, num_workers=0)
...@@ -181,6 +182,7 @@ their incident node representations. ...@@ -181,6 +182,7 @@ their incident node representations.
opt = torch.optim.Adam(model.parameters()) opt = torch.optim.Adam(model.parameters())
for data in dataloader: for data in dataloader:
data = data.to_dgl()
blocks = data.blocks blocks = data.blocks
x = data.edge_features("feat") x = data.edge_features("feat")
y_hat = model(data.blocks, x, data.positive_node_pairs) y_hat = model(data.blocks, x, data.positive_node_pairs)
...@@ -273,7 +275,6 @@ only difference is that the train_set is now an instance of ...@@ -273,7 +275,6 @@ only difference is that the train_set is now an instance of
datapipe = datapipe.fetch_feature( datapipe = datapipe.fetch_feature(
feature, node_feature_keys={"item": ["feat"], "user": ["feat"]} feature, node_feature_keys={"item": ["feat"], "user": ["feat"]}
) )
datapipe = datapipe.to_dgl()
datapipe = datapipe.copy_to(device) datapipe = datapipe.copy_to(device)
dataloader = gb.DataLoader(datapipe, num_workers=0) dataloader = gb.DataLoader(datapipe, num_workers=0)
...@@ -310,17 +311,17 @@ dictionaries of node types and predictions here. ...@@ -310,17 +311,17 @@ dictionaries of node types and predictions here.
.. code:: python .. code:: python
import torch.nn.functional as F
model = Model(in_features, hidden_features, out_features, num_classes, etypes) model = Model(in_features, hidden_features, out_features, num_classes, etypes)
model = model.cuda() model = model.to(device)
opt = torch.optim.Adam(model.parameters()) opt = torch.optim.Adam(model.parameters())
for input_nodes, edge_subgraph, blocks in dataloader: for data in dataloader:
blocks = [b.to(torch.device('cuda')) for b in blocks] data = data.to_dgl()
edge_subgraph = edge_subgraph.to(torch.device('cuda')) blocks = data.blocks
input_features = blocks[0].srcdata['features'] x = data.edge_features(("user:like:item", "feat"))
edge_labels = edge_subgraph.edata['labels'] y_hat = model(data.blocks, x, data.positive_node_pairs)
edge_predictions = model(edge_subgraph, blocks, input_features) loss = F.cross_entropy(data.labels, y_hat)
loss = compute_loss(edge_labels, edge_predictions)
opt.zero_grad() opt.zero_grad()
loss.backward() loss.backward()
opt.step() opt.step()
......
...@@ -47,7 +47,6 @@ only one layer at a time. ...@@ -47,7 +47,6 @@ only one layer at a time.
datapipe = gb.ItemSampler(all_nodes_set, batch_size=1024, shuffle=True) datapipe = gb.ItemSampler(all_nodes_set, batch_size=1024, shuffle=True)
datapipe = datapipe.sample_neighbor(g, [-1]) # 1 layers. datapipe = datapipe.sample_neighbor(g, [-1]) # 1 layers.
datapipe = datapipe.fetch_feature(feature, node_feature_keys=["feat"]) datapipe = datapipe.fetch_feature(feature, node_feature_keys=["feat"])
datapipe = datapipe.to_dgl()
datapipe = datapipe.copy_to(device) datapipe = datapipe.copy_to(device)
dataloader = gb.DataLoader(datapipe, num_workers=0) dataloader = gb.DataLoader(datapipe, num_workers=0)
...@@ -100,6 +99,7 @@ and combined as well. ...@@ -100,6 +99,7 @@ and combined as well.
feature = feature.to(device) feature = feature.to(device)
for step, data in tqdm(enumerate(dataloader)): for step, data in tqdm(enumerate(dataloader)):
data = data.to_dgl()
x = feature[data.input_nodes] x = feature[data.input_nodes]
hidden_x = layer(data.blocks[0], x) # len(blocks) = 1 hidden_x = layer(data.blocks[0], x) # len(blocks) = 1
if not is_last_layer: if not is_last_layer:
......
...@@ -27,7 +27,6 @@ The whole data loader pipeline is as follows: ...@@ -27,7 +27,6 @@ The whole data loader pipeline is as follows:
datapipe = datapipe.sample_neighbor(g, [10, 10]) # 2 layers. datapipe = datapipe.sample_neighbor(g, [10, 10]) # 2 layers.
datapipe = datapipe.transform(gb.exclude_seed_edges) datapipe = datapipe.transform(gb.exclude_seed_edges)
datapipe = datapipe.fetch_feature(feature, node_feature_keys=["feat"]) datapipe = datapipe.fetch_feature(feature, node_feature_keys=["feat"])
datapipe = datapipe.to_dgl()
datapipe = datapipe.copy_to(device) datapipe = datapipe.copy_to(device)
dataloader = gb.DataLoader(datapipe, num_workers=0) dataloader = gb.DataLoader(datapipe, num_workers=0)
...@@ -130,6 +129,8 @@ above. ...@@ -130,6 +129,8 @@ above.
total_loss = 0 total_loss = 0
start_epoch_time = time.time() start_epoch_time = time.time()
for step, data in enumerate(dataloader): for step, data in enumerate(dataloader):
# Convert MiniBatch to DGLMiniBatch.
data = data.to_dgl()
# Unpack MiniBatch. # Unpack MiniBatch.
compacted_pairs, labels = to_binary_link_dgl_computing_pack(data) compacted_pairs, labels = to_binary_link_dgl_computing_pack(data)
node_feature = data.node_features["feat"] node_feature = data.node_features["feat"]
...@@ -213,7 +214,6 @@ only difference is that you need to give edge types for feature fetching. ...@@ -213,7 +214,6 @@ only difference is that you need to give edge types for feature fetching.
feature, feature,
node_feature_keys={"user": ["feat"], "item": ["feat"]} node_feature_keys={"user": ["feat"], "item": ["feat"]}
) )
datapipe = datapipe.to_dgl()
datapipe = datapipe.copy_to(device) datapipe = datapipe.copy_to(device)
dataloader = gb.DataLoader(datapipe, num_workers=0) dataloader = gb.DataLoader(datapipe, num_workers=0)
...@@ -273,6 +273,8 @@ except for computing loss on specific edge type. ...@@ -273,6 +273,8 @@ except for computing loss on specific edge type.
total_loss = 0 total_loss = 0
start_epoch_time = time.time() start_epoch_time = time.time()
for step, data in enumerate(dataloader): for step, data in enumerate(dataloader):
# Convert MiniBatch to DGLMiniBatch.
data = data.to_dgl()
# Unpack MiniBatch. # Unpack MiniBatch.
compacted_pairs, labels = to_binary_link_dgl_computing_pack(data, category) compacted_pairs, labels = to_binary_link_dgl_computing_pack(data, category)
node_features = { node_features = {
......
...@@ -50,18 +50,20 @@ putting the list of generated MFGs onto GPU. ...@@ -50,18 +50,20 @@ putting the list of generated MFGs onto GPU.
# Or equivalently: # Or equivalently:
# datapipe = gb.NeighborSampler(datapipe, g, [10, 10]) # datapipe = gb.NeighborSampler(datapipe, g, [10, 10])
datapipe = datapipe.fetch_feature(feature, node_feature_keys=["feat"]) datapipe = datapipe.fetch_feature(feature, node_feature_keys=["feat"])
datapipe = datapipe.to_dgl()
datapipe = datapipe.copy_to(device) datapipe = datapipe.copy_to(device)
dataloader = gb.DataLoader(datapipe, num_workers=0) dataloader = gb.DataLoader(datapipe, num_workers=0)
Iterating over the DataLoader will yield :class:`~dgl.graphbolt.DGLMiniBatch` Iterating over the DataLoader will yield :class:`~dgl.graphbolt.MiniBatch`
which contains a list of specially created graphs representing the computation which contains a list of specially created graphs representing the computation
dependencies on each layer. They are called *message flow graphs* (MFGs) in DGL. dependencies on each layer. In order to train with DGL, you need to convert them
to :class:`~dgl.graphbolt.DGLMiniBatch`. Then you could access the
*message flow graphs* (MFGs).
.. code:: python .. code:: python
mini_batch = next(iter(dataloader)) mini_batch = next(iter(dataloader))
mini_batch = mini_batch.to_dgl()
print(mini_batch.blocks) print(mini_batch.blocks)
...@@ -128,17 +130,20 @@ Training Loop ...@@ -128,17 +130,20 @@ Training Loop
The training loop simply consists of iterating over the dataset with the The training loop simply consists of iterating over the dataset with the
customized batching iterator. During each iteration that yields customized batching iterator. During each iteration that yields
:class:`~dgl.graphbolt.DGLMiniBatch`, we: :class:`~dgl.graphbolt.MiniBatch`, we:
1. Access the node features corresponding to the input nodes via 1. Convert the :class:`~dgl.graphbolt.MiniBatch` to
:class:`~dgl.graphbolt.DGLMiniBatch`.
2. Access the node features corresponding to the input nodes via
``data.node_features["feat"]``. These features are already moved to the ``data.node_features["feat"]``. These features are already moved to the
target device (CPU or GPU) by the data loader. target device (CPU or GPU) by the data loader.
2. Access the node labels corresponding to the output nodes via 3. Access the node labels corresponding to the output nodes via
``data.labels``. These labels are already moved to the target device ``data.labels``. These labels are already moved to the target device
(CPU or GPU) by the data loader. (CPU or GPU) by the data loader.
3. Feed the list of MFGs and the input node features to the multilayer 4. Feed the list of MFGs and the input node features to the multilayer
GNN and get the outputs. GNN and get the outputs.
4. Compute the loss and backpropagate. 4. Compute the loss and backpropagate.
...@@ -150,6 +155,7 @@ customized batching iterator. During each iteration that yields ...@@ -150,6 +155,7 @@ customized batching iterator. During each iteration that yields
opt = torch.optim.Adam(model.parameters()) opt = torch.optim.Adam(model.parameters())
for data in dataloader: for data in dataloader:
data = data.to_dgl()
input_features = data.node_features["feat"] input_features = data.node_features["feat"]
output_labels = data.labels output_labels = data.labels
output_predictions = model(data.blocks, input_features) output_predictions = model(data.blocks, input_features)
...@@ -215,7 +221,6 @@ of node types to node IDs. ...@@ -215,7 +221,6 @@ of node types to node IDs.
datapipe = datapipe.fetch_feature( datapipe = datapipe.fetch_feature(
feature, node_feature_keys={"author": ["feat"], "paper": ["feat"]} feature, node_feature_keys={"author": ["feat"], "paper": ["feat"]}
) )
datapipe = datapipe.to_dgl()
datapipe = datapipe.copy_to(device) datapipe = datapipe.copy_to(device)
dataloader = gb.DataLoader(datapipe, num_workers=0) dataloader = gb.DataLoader(datapipe, num_workers=0)
...@@ -230,6 +235,7 @@ dictionaries of node types and predictions here. ...@@ -230,6 +235,7 @@ dictionaries of node types and predictions here.
opt = torch.optim.Adam(model.parameters()) opt = torch.optim.Adam(model.parameters())
for data in dataloader: for data in dataloader:
data = data.to_dgl()
# For heterogeneous graphs, we need to specify the node types and # For heterogeneous graphs, we need to specify the node types and
# feature name when accessing the node features. So does the labels. # feature name when accessing the node features. So does the labels.
input_features = { input_features = {
......
...@@ -21,7 +21,6 @@ generate a minibatch, including: ...@@ -21,7 +21,6 @@ generate a minibatch, including:
datapipe = datapipe.sample_neighbor(g, [10, 10]) # 2 layers. datapipe = datapipe.sample_neighbor(g, [10, 10]) # 2 layers.
datapipe = datapipe.transform(gb.exclude_seed_edges) datapipe = datapipe.transform(gb.exclude_seed_edges)
datapipe = datapipe.fetch_feature(feature, node_feature_keys=["feat"]) datapipe = datapipe.fetch_feature(feature, node_feature_keys=["feat"])
datapipe = datapipe.to_dgl()
datapipe = datapipe.copy_to(device) datapipe = datapipe.copy_to(device)
dataloader = gb.DataLoader(datapipe, num_workers=0) dataloader = gb.DataLoader(datapipe, num_workers=0)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment