Unverified Commit 1e40c81b authored by Rhett Ying's avatar Rhett Ying Committed by GitHub
Browse files

[doc] update minibatch link prediction (#6649)

parent aa9c1101
......@@ -5,44 +5,40 @@
:ref:`(中文版) <guide_cn-minibatch-link-classification-sampler>`
Define a neighborhood sampler and data loader with negative sampling
Define a data loader with neighbor and negative sampling
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
You can still use the same neighborhood sampler as the one in node/edge
classification.
You can still use the same data loader as the one in node/edge classification.
The only difference is that you need to add an additional stage
`negative sampling` before neighbor sampling stage. The following data loader
will pick 5 negative destination nodes uniformly for each source node of an
edge.
.. code:: python
sampler = dgl.dataloading.MultiLayerFullNeighborSampler(2)
datapipe = datapipe.sample_uniform_negative(graph, 5)
:func:`~dgl.dataloading.as_edge_prediction_sampler` in DGL also
supports generating negative samples for link prediction. To do so, you
need to provide the negative sampling function.
:class:`~dgl.dataloading.negative_sampler.Uniform` is a
function that does uniform sampling. For each source node of an edge, it
samples ``k`` negative destination nodes.
The following data loader will pick 5 negative destination nodes
uniformly for each source node of an edge.
The whole data loader pipeline is as follows:
.. code:: python
sampler = dgl.dataloading.as_edge_prediction_sampler(
sampler, negative_sampler=dgl.dataloading.negative_sampler.Uniform(5))
dataloader = dgl.dataloading.DataLoader(
g, train_seeds, sampler,
batch_size=args.batch_size,
shuffle=True,
drop_last=False,
pin_memory=True,
num_workers=args.num_workers)
datapipe = gb.ItemSampler(itemset, batch_size=1024, shuffle=True)
datapipe = datapipe.sample_uniform_negative(graph, 5)
datapipe = datapipe.sample_neighbor(g, [10, 10]) # 2 layers.
datapipe = datapipe.transform(gb.exclude_seed_edges)
datapipe = datapipe.fetch_feature(feature, node_feature_keys=["feat"])
datapipe = datapipe.to_dgl()
datapipe = datapipe.copy_to(device)
dataloader = gb.MultiProcessDataLoader(datapipe, num_workers=0)
For the builtin negative samplers please see :ref:`api-dataloading-negative-sampling`.
For the details about the builtin uniform negative sampler please see
:class:`~dgl.graphbolt.UniformNegativeSampler`.
You can also give your own negative sampler function, as long as it
takes in the original graph ``g`` and the minibatch edge ID array
``eid``, and returns a pair of source ID arrays and destination ID
arrays.
You can also give your own negative sampler function, as long as it inherits
from :class:`~dgl.graphbolt.NegativeSampler` and overrides the
:meth:`~dgl.graphbolt.NegativeSampler._sample_with_etype` method which takes in
the node pairs in minibatch, and returns the negative node pairs back.
The following gives an example of custom negative sampler that samples
negative destination nodes according to a probability distribution
......@@ -50,88 +46,73 @@ proportional to a power of degrees.
.. code:: python
class NegativeSampler(object):
def __init__(self, g, k):
@functional_datapipe("customized_sample_negative")
class CustomizedNegativeSampler(dgl.graphbolt.NegativeSampler):
def __init__(self, datapipe, k, node_degrees):
super().__init__(datapipe, k)
# caches the probability distribution
self.weights = g.in_degrees().float() ** 0.75
self.weights = node_degrees ** 0.75
self.k = k
def __call__(self, g, eids):
src, _ = g.find_edges(eids)
def _sample_with_etype(node_pairs, etype=None):
src, _ = node_pairs
src = src.repeat_interleave(self.k)
dst = self.weights.multinomial(len(src), replacement=True)
return src, dst
sampler = dgl.dataloading.as_edge_prediction_sampler(
sampler, negative_sampler=NegativeSampler(g, 5))
dataloader = dgl.dataloading.DataLoader(
g, train_seeds, sampler,
batch_size=args.batch_size,
shuffle=True,
drop_last=False,
pin_memory=True,
num_workers=args.num_workers)
Adapt your model for minibatch training
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
As explained in :ref:`guide-training-link-prediction`, link prediction is trained
via comparing the score of an edge (positive example) against a
non-existent edge (negative example). To compute the scores of edges you
can reuse the node representation computation model you have seen in
edge classification/regression.
datapipe = datapipe.customized_sample_negative(5, node_degrees)
.. code:: python
class StochasticTwoLayerGCN(nn.Module):
def __init__(self, in_features, hidden_features, out_features):
super().__init__()
self.conv1 = dgl.nn.GraphConv(in_features, hidden_features)
self.conv2 = dgl.nn.GraphConv(hidden_features, out_features)
def forward(self, blocks, x):
x = F.relu(self.conv1(blocks[0], x))
x = F.relu(self.conv2(blocks[1], x))
return x
For score prediction, since you only need to predict a scalar score for
each edge instead of a probability distribution, this example shows how
to compute a score with a dot product of incident node representations.
Define a GraphSAGE model for minibatch training
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
.. code:: python
class ScorePredictor(nn.Module):
def forward(self, edge_subgraph, x):
with edge_subgraph.local_scope():
edge_subgraph.ndata['x'] = x
edge_subgraph.apply_edges(dgl.function.u_dot_v('x', 'x', 'score'))
return edge_subgraph.edata['score']
When a negative sampler is provided, DGLs data loader will generate
three items per minibatch:
- A positive graph containing all the edges sampled in the minibatch.
class SAGE(nn.Module):
def __init__(self, in_size, hidden_size):
super().__init__()
self.layers = nn.ModuleList()
self.layers.append(dglnn.SAGEConv(in_size, hidden_size, "mean"))
self.layers.append(dglnn.SAGEConv(hidden_size, hidden_size, "mean"))
self.layers.append(dglnn.SAGEConv(hidden_size, hidden_size, "mean"))
self.hidden_size = hidden_size
self.predictor = nn.Sequential(
nn.Linear(hidden_size, hidden_size),
nn.ReLU(),
nn.Linear(hidden_size, hidden_size),
nn.ReLU(),
nn.Linear(hidden_size, 1),
)
- A negative graph containing all the non-existent edges generated by
the negative sampler.
def forward(self, blocks, x):
hidden_x = x
for layer_idx, (layer, block) in enumerate(zip(self.layers, blocks)):
hidden_x = layer(block, hidden_x)
is_last_layer = layer_idx == len(self.layers) - 1
if not is_last_layer:
hidden_x = F.relu(hidden_x)
return hidden_x
- A list of *message flow graphs* (MFGs) generated by the neighborhood sampler.
So one can define the link prediction model as follows that takes in the
three items as well as the input features.
When a negative sampler is provided, the data loader will generate positive and
negative node pairs for each minibatch besides the *Message Flow Graphs* (MFGs).
Let's define a utility function to compact node pairs as follows:
.. code:: python
class Model(nn.Module):
def __init__(self, in_features, hidden_features, out_features):
super().__init__()
self.gcn = StochasticTwoLayerGCN(
in_features, hidden_features, out_features)
def to_binary_link_dgl_computing_pack(data: gb.DGLMiniBatch):
"""Convert the minibatch to a training pair and a label tensor."""
pos_src, pos_dst = data.positive_node_pairs
neg_src, neg_dst = data.negative_node_pairs
node_pairs = (
torch.cat((pos_src, neg_src), dim=0),
torch.cat((pos_dst, neg_dst), dim=0),
)
pos_label = torch.ones_like(pos_src)
neg_label = torch.zeros_like(neg_src)
labels = torch.cat([pos_label, neg_label], dim=0)
return (node_pairs, labels.float())
def forward(self, positive_graph, negative_graph, blocks, x):
x = self.gcn(blocks, x)
pos_score = self.predictor(positive_graph, x)
neg_score = self.predictor(negative_graph, x)
return pos_score, neg_score
Training loop
~~~~~~~~~~~~~
......@@ -142,164 +123,176 @@ above.
.. code:: python
def compute_loss(pos_score, neg_score):
# an example hinge loss
n = pos_score.shape[0]
return (neg_score.view(n, -1) - pos_score.view(n, -1) + 1).clamp(min=0).mean()
model = Model(in_features, hidden_features, out_features)
model = model.cuda()
opt = torch.optim.Adam(model.parameters())
for input_nodes, positive_graph, negative_graph, blocks in dataloader:
blocks = [b.to(torch.device('cuda')) for b in blocks]
positive_graph = positive_graph.to(torch.device('cuda'))
negative_graph = negative_graph.to(torch.device('cuda'))
input_features = blocks[0].srcdata['features']
pos_score, neg_score = model(positive_graph, negative_graph, blocks, input_features)
loss = compute_loss(pos_score, neg_score)
opt.zero_grad()
optimizer = torch.optim.Adam(model.parameters(), lr=0.01)
for epoch in tqdm.trange(args.epochs):
model.train()
total_loss = 0
start_epoch_time = time.time()
for step, data in enumerate(dataloader):
# Unpack MiniBatch.
compacted_pairs, labels = to_binary_link_dgl_computing_pack(data)
node_feature = data.node_features["feat"]
# Convert sampled subgraphs to DGL blocks.
blocks = data.blocks
# Get the embeddings of the input nodes.
y = model(blocks, node_feature)
logits = model.predictor(
y[compacted_pairs[0]] * y[compacted_pairs[1]]
).squeeze()
# Compute loss.
loss = F.binary_cross_entropy_with_logits(logits, labels)
optimizer.zero_grad()
loss.backward()
opt.step()
optimizer.step()
total_loss += loss.item()
end_epoch_time = time.time()
DGL provides the
`unsupervised learning GraphSAGE <https://github.com/dmlc/dgl/blob/master/examples/pytorch/graphsage/train_sampling_unsupervised.py>`__
`unsupervised learning GraphSAGE <https://github.com/dmlc/dgl/blob/master/examples/sampling/graphbolt/link_prediction.py>`__
that shows an example of link prediction on homogeneous graphs.
For heterogeneous graphs
~~~~~~~~~~~~~~~~~~~~~~~~
The models computing the node representations on heterogeneous graphs
can also be used for computing incident node representations for edge
classification/regression.
The previous model could be easily extended to heterogeneous graphs. The only
difference is that you need to use :class:`~dgl.nn.HeteroGraphConv` to wrap
:class:`~dgl.nn.SAGEConv` according to edge types.
.. code:: python
class StochasticTwoLayerRGCN(nn.Module):
def __init__(self, in_feat, hidden_feat, out_feat, rel_names):
class SAGE(nn.Module):
def __init__(self, in_size, hidden_size):
super().__init__()
self.conv1 = dglnn.HeteroGraphConv({
rel : dglnn.GraphConv(in_feat, hidden_feat, norm='right')
self.layers = nn.ModuleList()
self.layers.append(dglnn.HeteroGraphConv({
rel : dglnn.SAGEConv(in_size, hidden_size, "mean")
for rel in rel_names
})
self.conv2 = dglnn.HeteroGraphConv({
rel : dglnn.GraphConv(hidden_feat, out_feat, norm='right')
}))
self.layers.append(dglnn.HeteroGraphConv({
rel : dglnn.SAGEConv(hidden_size, hidden_size, "mean")
for rel in rel_names
})
}))
self.layers.append(dglnn.HeteroGraphConv({
rel : dglnn.SAGEConv(hidden_size, hidden_size, "mean")
for rel in rel_names
}))
self.hidden_size = hidden_size
self.predictor = nn.Sequential(
nn.Linear(hidden_size, hidden_size),
nn.ReLU(),
nn.Linear(hidden_size, hidden_size),
nn.ReLU(),
nn.Linear(hidden_size, 1),
)
def forward(self, blocks, x):
x = self.conv1(blocks[0], x)
x = self.conv2(blocks[1], x)
return x
For score prediction, the only implementation difference between the
homogeneous graph and the heterogeneous graph is that we are looping
over the edge types for :meth:`dgl.DGLGraph.apply_edges`.
hidden_x = x
for layer_idx, (layer, block) in enumerate(zip(self.layers, blocks)):
hidden_x = layer(block, hidden_x)
is_last_layer = layer_idx == len(self.layers) - 1
if not is_last_layer:
hidden_x = F.relu(hidden_x)
return hidden_x
.. code:: python
class ScorePredictor(nn.Module):
def forward(self, edge_subgraph, x):
with edge_subgraph.local_scope():
edge_subgraph.ndata['x'] = x
for etype in edge_subgraph.canonical_etypes:
edge_subgraph.apply_edges(
dgl.function.u_dot_v('x', 'x', 'score'), etype=etype)
return edge_subgraph.edata['score']
class Model(nn.Module):
def __init__(self, in_features, hidden_features, out_features, num_classes,
etypes):
super().__init__()
self.rgcn = StochasticTwoLayerRGCN(
in_features, hidden_features, out_features, etypes)
self.pred = ScorePredictor()
def forward(self, positive_graph, negative_graph, blocks, x):
x = self.rgcn(blocks, x)
pos_score = self.pred(positive_graph, x)
neg_score = self.pred(negative_graph, x)
return pos_score, neg_score
Data loader definition is also very similar to that of edge
classification/regression. The only difference is that you need to give
the negative sampler and you will be supplying a dictionary of edge
types and edge ID tensors instead of a dictionary of node types and node
ID tensors.
Data loader definition is also very similar to that for homogeneous graph. The
only difference is that you need to give edge types for feature fetching.
.. code:: python
sampler = dgl.dataloading.MultiLayerFullNeighborSampler(2)
sampler = dgl.dataloading.as_edge_prediction_sampler(
sampler, negative_sampler=dgl.dataloading.negative_sampler.Uniform(5))
dataloader = dgl.dataloading.DataLoader(
g, train_eid_dict, sampler,
batch_size=1024,
shuffle=True,
drop_last=False,
num_workers=4)
If you want to give your own negative sampling function, the function
should take in the original graph and the dictionary of edge types and
edge ID tensors. It should return a dictionary of edge types and
source-destination array pairs. An example is given as follows:
datapipe = gb.ItemSampler(itemset, batch_size=1024, shuffle=True)
datapipe = datapipe.sample_uniform_negative(graph, 5)
datapipe = datapipe.sample_neighbor(g, [10, 10]) # 2 layers.
datapipe = datapipe.transform(gb.exclude_seed_edges)
datapipe = datapipe.fetch_feature(
feature,
node_feature_keys={"user": ["feat"], "item": ["feat"]}
)
datapipe = datapipe.to_dgl()
datapipe = datapipe.copy_to(device)
dataloader = gb.MultiProcessDataLoader(datapipe, num_workers=0)
If you want to give your own negative sampling function, just inherit from the
:class:`~dgl.graphbolt.NegativeSampler` class and override the
:meth:`~dgl.graphbolt.NegativeSampler._sample_with_etype` method.
.. code:: python
class NegativeSampler(object):
def __init__(self, g, k):
@functional_datapipe("customized_sample_negative")
class CustomizedNegativeSampler(dgl.graphbolt.NegativeSampler):
def __init__(self, datapipe, k, node_degrees):
super().__init__(datapipe, k)
# caches the probability distribution
self.weights = {
etype: g.in_degrees(etype=etype).float() ** 0.75
for etype in g.canonical_etypes}
etype: node_degrees[etype] ** 0.75 for etype in node_degrees
}
self.k = k
def __call__(self, g, eids_dict):
result_dict = {}
for etype, eids in eids_dict.items():
src, _ = g.find_edges(eids, etype=etype)
def _sample_with_etype(node_pairs, etype):
src, _ = node_pairs
src = src.repeat_interleave(self.k)
dst = self.weights[etype].multinomial(len(src), replacement=True)
result_dict[etype] = (src, dst)
return result_dict
return src, dst
datapipe = datapipe.customized_sample_negative(5, node_degrees)
Then you can give the dataloader a dictionary of edge types and edge IDs as well as the negative
sampler. For instance, the following iterates over all edges of the heterogeneous graph.
For heterogeneous graphs, node pairs are grouped by edge types.
.. code:: python
train_eid_dict = {
etype: g.edges(etype=etype, form='eid')
for etype in g.canonical_etypes}
sampler = dgl.dataloading.as_edge_prediction_sampler(
sampler, negative_sampler=NegativeSampler(g, 5))
dataloader = dgl.dataloading.DataLoader(
g, train_eid_dict, sampler,
batch_size=1024,
shuffle=True,
drop_last=False,
num_workers=4)
def to_binary_link_dgl_computing_pack(data: gb.DGLMiniBatch, etype):
"""Convert the minibatch to a training pair and a label tensor."""
pos_src, pos_dst = data.positive_node_pairs[etype]
neg_src, neg_dst = data.negative_node_pairs[etype]
node_pairs = (
torch.cat((pos_src, neg_src), dim=0),
torch.cat((pos_dst, neg_dst), dim=0),
)
pos_label = torch.ones_like(pos_src)
neg_label = torch.zeros_like(neg_src)
labels = torch.cat([pos_label, neg_label], dim=0)
return (node_pairs, labels.float())
The training loop is again almost the same as that on homogeneous graph,
except for the implementation of ``compute_loss`` that will take in two
dictionaries of node types and predictions here.
except for computing loss on specific edge type.
.. code:: python
model = Model(in_features, hidden_features, out_features, num_classes, etypes)
model = model.cuda()
opt = torch.optim.Adam(model.parameters())
for input_nodes, positive_graph, negative_graph, blocks in dataloader:
blocks = [b.to(torch.device('cuda')) for b in blocks]
positive_graph = positive_graph.to(torch.device('cuda'))
negative_graph = negative_graph.to(torch.device('cuda'))
input_features = blocks[0].srcdata['features']
pos_score, neg_score = model(positive_graph, negative_graph, blocks, input_features)
loss = compute_loss(pos_score, neg_score)
opt.zero_grad()
optimizer = torch.optim.Adam(model.parameters(), lr=0.01)
category = "user"
for epoch in tqdm.trange(args.epochs):
model.train()
total_loss = 0
start_epoch_time = time.time()
for step, data in enumerate(dataloader):
# Unpack MiniBatch.
compacted_pairs, labels = to_binary_link_dgl_computing_pack(data, category)
node_features = {
ntype: data.node_features[(ntype, "feat")]
for ntype in data.blocks[0].srctypes
}
# Convert sampled subgraphs to DGL blocks.
blocks = data.blocks
# Get the embeddings of the input nodes.
y = model(blocks, node_feature)
logits = model.predictor(
y[category][compacted_pairs[0]] * y[category][compacted_pairs[1]]
).squeeze()
# Compute loss.
loss = F.binary_cross_entropy_with_logits(logits, labels)
optimizer.zero_grad()
loss.backward()
opt.step()
optimizer.step()
total_loss += loss.item()
end_epoch_time = time.time()
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment