Unverified Commit afca1114 authored by peizhou001's avatar peizhou001 Committed by GitHub
Browse files

[Graphbolt]Fix negative sampler (#6933)


Co-authored-by: default avatarUbuntu <ubuntu@ip-172-31-21-218.ap-northeast-1.compute.internal>
parent 3ff7ad9d
...@@ -359,32 +359,6 @@ class FusedCSCSamplingGraph : public torch::CustomClassHolder { ...@@ -359,32 +359,6 @@ class FusedCSCSamplingGraph : public torch::CustomClassHolder {
torch::optional<std::string> node_timestamp_attr_name, torch::optional<std::string> node_timestamp_attr_name,
torch::optional<std::string> edge_timestamp_attr_name) const; torch::optional<std::string> edge_timestamp_attr_name) const;
/**
* @brief Sample negative edges by randomly choosing negative
* source-destination pairs according to a uniform distribution. For each edge
* ``(u, v)``, it is supposed to generate `negative_ratio` pairs of negative
* edges ``(u, v')``, where ``v'`` is chosen uniformly from all the nodes in
* the graph.
*
* @param node_pairs A tuple of two 1D tensors that represent the source and
* destination of positive edges, with 'positive' indicating that these edges
* are present in the graph. It's important to note that within the context of
* a heterogeneous graph, the ids in these tensors signify heterogeneous ids.
* @param negative_ratio The ratio of the number of negative samples to
* positive samples.
* @param max_node_id The maximum ID of the node to be selected. It
* should correspond to the number of nodes of a specific type.
*
* @return A tuple consisting of two 1D tensors represents the source and
* destination of negative edges. In the context of a heterogeneous
* graph, both the input nodes and the selected nodes are represented
* by heterogeneous IDs. Note that negative refers to false negatives,
* which means the edge could be present or not present in the graph.
*/
std::tuple<torch::Tensor, torch::Tensor> SampleNegativeEdgesUniform(
const std::tuple<torch::Tensor, torch::Tensor>& node_pairs,
int64_t negative_ratio, int64_t max_node_id) const;
/** /**
* @brief Copy the graph to shared memory. * @brief Copy the graph to shared memory.
* @param shared_memory_name The name of the shared memory. * @param shared_memory_name The name of the shared memory.
......
...@@ -712,18 +712,6 @@ FusedCSCSamplingGraph::TemporalSampleNeighbors( ...@@ -712,18 +712,6 @@ FusedCSCSamplingGraph::TemporalSampleNeighbors(
} }
} }
std::tuple<torch::Tensor, torch::Tensor>
FusedCSCSamplingGraph::SampleNegativeEdgesUniform(
const std::tuple<torch::Tensor, torch::Tensor>& node_pairs,
int64_t negative_ratio, int64_t max_node_id) const {
torch::Tensor pos_src;
std::tie(pos_src, std::ignore) = node_pairs;
auto neg_len = pos_src.size(0) * negative_ratio;
auto neg_src = pos_src.repeat(negative_ratio);
auto neg_dst = torch::randint(0, max_node_id, {neg_len}, pos_src.options());
return std::make_tuple(neg_src, neg_dst);
}
static c10::intrusive_ptr<FusedCSCSamplingGraph> static c10::intrusive_ptr<FusedCSCSamplingGraph>
BuildGraphFromSharedMemoryHelper(SharedMemoryHelper&& helper) { BuildGraphFromSharedMemoryHelper(SharedMemoryHelper&& helper) {
helper.InitializeRead(); helper.InitializeRead();
......
...@@ -52,9 +52,6 @@ TORCH_LIBRARY(graphbolt, m) { ...@@ -52,9 +52,6 @@ TORCH_LIBRARY(graphbolt, m) {
.def( .def(
"temporal_sample_neighbors", "temporal_sample_neighbors",
&FusedCSCSamplingGraph::TemporalSampleNeighbors) &FusedCSCSamplingGraph::TemporalSampleNeighbors)
.def(
"sample_negative_edges_uniform",
&FusedCSCSamplingGraph::SampleNegativeEdgesUniform)
.def("copy_to_shared_memory", &FusedCSCSamplingGraph::CopyToSharedMemory) .def("copy_to_shared_memory", &FusedCSCSamplingGraph::CopyToSharedMemory)
.def_pickle( .def_pickle(
// __getstate__ // __getstate__
......
...@@ -876,7 +876,8 @@ class FusedCSCSamplingGraph(SamplingGraph): ...@@ -876,7 +876,8 @@ class FusedCSCSamplingGraph(SamplingGraph):
pairs according to a uniform distribution. For each edge ``(u, v)``, pairs according to a uniform distribution. For each edge ``(u, v)``,
it is supposed to generate `negative_ratio` pairs of negative edges it is supposed to generate `negative_ratio` pairs of negative edges
``(u, v')``, where ``v'`` is chosen uniformly from all the nodes in ``(u, v')``, where ``v'`` is chosen uniformly from all the nodes in
the graph. the graph. As ``u`` is exactly same as the corresponding positive edges,
it returns None for negative sources.
Parameters Parameters
---------- ----------
...@@ -903,23 +904,22 @@ class FusedCSCSamplingGraph(SamplingGraph): ...@@ -903,23 +904,22 @@ class FusedCSCSamplingGraph(SamplingGraph):
`edge_type`. Note that negative refers to false negatives, which `edge_type`. Note that negative refers to false negatives, which
means the edge could be present or not present in the graph. means the edge could be present or not present in the graph.
""" """
if edge_type is not None: if edge_type:
assert ( _, _, dst_ntype = etype_str_to_tuple(edge_type)
self.node_type_offset is not None max_node_id = self.num_nodes[dst_ntype]
), "The 'node_type_offset' array is necessary for performing \
negative sampling by edge type."
_, _, dst_node_type = etype_str_to_tuple(edge_type)
dst_node_type_id = self.node_type_to_id[dst_node_type]
offset = self._node_type_offset_list
max_node_id = (
offset[dst_node_type_id + 1] - offset[dst_node_type_id]
)
else: else:
max_node_id = self.total_num_nodes max_node_id = self.total_num_nodes
return self._c_csc_graph.sample_negative_edges_uniform( pos_src, _ = node_pairs
node_pairs, num_negative = pos_src.size(0) * negative_ratio
negative_ratio, return (
max_node_id, None,
torch.randint(
0,
max_node_id,
(num_negative,),
dtype=pos_src.dtype,
device=pos_src.device,
),
) )
def copy_to_shared_memory(self, shared_memory_name: str): def copy_to_shared_memory(self, shared_memory_name: str):
......
...@@ -32,20 +32,23 @@ class UniformNegativeSampler(NegativeSampler): ...@@ -32,20 +32,23 @@ class UniformNegativeSampler(NegativeSampler):
Examples Examples
-------- --------
>>> from dgl import graphbolt as gb >>> from dgl import graphbolt as gb
>>> indptr = torch.LongTensor([0, 2, 4, 5]) >>> indptr = torch.LongTensor([0, 1, 2, 3, 4])
>>> indices = torch.LongTensor([1, 2, 0, 2, 0]) >>> indices = torch.LongTensor([1, 2, 3, 0])
>>> graph = gb.fused_csc_sampling_graph(indptr, indices) >>> graph = gb.fused_csc_sampling_graph(indptr, indices)
>>> node_pairs = (torch.tensor([0, 1]), torch.tensor([1, 2])) >>> node_pairs = torch.tensor([[0, 1], [1, 2], [2, 3], [3, 0]])
>>> item_set = gb.ItemSet(node_pairs, names="node_pairs") >>> item_set = gb.ItemSet(node_pairs, names="node_pairs")
>>> item_sampler = gb.ItemSampler( >>> item_sampler = gb.ItemSampler(
... item_set, batch_size=1,) ... item_set, batch_size=4,)
>>> neg_sampler = gb.UniformNegativeSampler( >>> neg_sampler = gb.UniformNegativeSampler(
... item_sampler, graph, 2) ... item_sampler, graph, 2)
>>> for minibatch in neg_sampler: >>> for minibatch in neg_sampler:
... print(minibatch.negative_srcs) ... print(minibatch.negative_srcs)
... print(minibatch.negative_dsts) ... print(minibatch.negative_dsts)
(tensor([0, 0, 0]), tensor([1, 1, 2]), tensor([1, 0, 0])) None
(tensor([1, 1, 1]), tensor([2, 1, 2]), tensor([1, 0, 0])) tensor([[2, 1],
[2, 1],
[3, 2],
[1, 3]])
""" """
def __init__( def __init__(
......
...@@ -46,8 +46,7 @@ def test_UniformNegativeSampler_invoke(): ...@@ -46,8 +46,7 @@ def test_UniformNegativeSampler_invoke():
def _verify(negative_sampler): def _verify(negative_sampler):
for data in negative_sampler: for data in negative_sampler:
# Assertation # Assertation
assert data.negative_srcs.size(0) == batch_size assert data.negative_srcs is None
assert data.negative_srcs.size(1) == negative_ratio
assert data.negative_dsts.size(0) == batch_size assert data.negative_dsts.size(0) == batch_size
assert data.negative_dsts.size(1) == negative_ratio assert data.negative_dsts.size(1) == negative_ratio
...@@ -90,12 +89,9 @@ def test_Uniform_NegativeSampler(negative_ratio): ...@@ -90,12 +89,9 @@ def test_Uniform_NegativeSampler(negative_ratio):
# Assertation # Assertation
assert len(pos_src) == batch_size assert len(pos_src) == batch_size
assert len(pos_dst) == batch_size assert len(pos_dst) == batch_size
assert len(neg_src) == batch_size
assert len(neg_dst) == batch_size assert len(neg_dst) == batch_size
assert neg_src.numel() == batch_size * negative_ratio assert neg_src is None
assert neg_dst.numel() == batch_size * negative_ratio assert neg_dst.numel() == batch_size * negative_ratio
expected_src = pos_src.repeat(negative_ratio).view(-1, negative_ratio)
assert torch.equal(expected_src, neg_src)
def get_hetero_graph(): def get_hetero_graph():
......
...@@ -48,7 +48,7 @@ def test_integration_link_prediction(): ...@@ -48,7 +48,7 @@ def test_integration_link_prediction():
} }
feature_store = gb.BasicFeatureStore(features) feature_store = gb.BasicFeatureStore(features)
datapipe = gb.ItemSampler(item_set, batch_size=4) datapipe = gb.ItemSampler(item_set, batch_size=4)
datapipe = datapipe.sample_uniform_negative(graph, 1) datapipe = datapipe.sample_uniform_negative(graph, 2)
fanouts = torch.LongTensor([1]) fanouts = torch.LongTensor([1])
datapipe = datapipe.sample_neighbor(graph, [fanouts, fanouts], replace=True) datapipe = datapipe.sample_neighbor(graph, [fanouts, fanouts], replace=True)
datapipe = datapipe.transform(gb.exclude_seed_edges) datapipe = datapipe.transform(gb.exclude_seed_edges)
...@@ -62,23 +62,23 @@ def test_integration_link_prediction(): ...@@ -62,23 +62,23 @@ def test_integration_link_prediction():
str( str(
"""MiniBatch(seed_nodes=None, """MiniBatch(seed_nodes=None,
sampled_subgraphs=[SampledSubgraphImpl(sampled_csc=CSCFormatBase(indptr=tensor([0, 1, 1, 1, 1, 1, 2]), sampled_subgraphs=[SampledSubgraphImpl(sampled_csc=CSCFormatBase(indptr=tensor([0, 1, 1, 1, 1, 1, 2]),
indices=tensor([5, 4]), indices=tensor([0, 4]),
), ),
original_row_node_ids=tensor([5, 3, 1, 2, 0, 4]), original_row_node_ids=tensor([5, 3, 1, 2, 0, 4]),
original_edge_ids=None, original_edge_ids=None,
original_column_node_ids=tensor([5, 3, 1, 2, 0, 4]), original_column_node_ids=tensor([5, 3, 1, 2, 0, 4]),
), ),
SampledSubgraphImpl(sampled_csc=CSCFormatBase(indptr=tensor([0, 1, 1, 1, 1, 1]), SampledSubgraphImpl(sampled_csc=CSCFormatBase(indptr=tensor([0, 1, 1, 1, 1, 1, 2]),
indices=tensor([5]), indices=tensor([5, 4]),
), ),
original_row_node_ids=tensor([5, 3, 1, 2, 0, 4]), original_row_node_ids=tensor([5, 3, 1, 2, 0, 4]),
original_edge_ids=None, original_edge_ids=None,
original_column_node_ids=tensor([5, 3, 1, 2, 0]), original_column_node_ids=tensor([5, 3, 1, 2, 0, 4]),
)], )],
positive_node_pairs=(tensor([0, 1, 1, 1]), positive_node_pairs=(tensor([0, 1, 1, 1]),
tensor([2, 3, 3, 1])), tensor([2, 3, 3, 1])),
node_pairs_with_labels=((tensor([0, 1, 1, 1, 0, 1, 1, 1]), tensor([2, 3, 3, 1, 4, 4, 1, 4])), node_pairs_with_labels=((tensor([0, 1, 1, 1, 0, 0, 1, 1, 1, 1, 1, 1]), tensor([2, 3, 3, 1, 4, 4, 1, 4, 0, 1, 1, 5])),
tensor([1., 1., 1., 1., 0., 0., 0., 0.])), tensor([1., 1., 1., 1., 0., 0., 0., 0., 0., 0., 0., 0.])),
node_pairs=(tensor([5, 3, 3, 3]), node_pairs=(tensor([5, 3, 3, 3]),
tensor([1, 2, 2, 3])), tensor([1, 2, 2, 3])),
node_features={'feat': tensor([[0.5160, 0.2486], node_features={'feat': tensor([[0.5160, 0.2486],
...@@ -87,131 +87,120 @@ def test_integration_link_prediction(): ...@@ -87,131 +87,120 @@ def test_integration_link_prediction():
[0.2109, 0.1089], [0.2109, 0.1089],
[0.9634, 0.2294], [0.9634, 0.2294],
[0.5503, 0.8223]])}, [0.5503, 0.8223]])},
negative_srcs=tensor([[5], negative_srcs=None,
[3], negative_node_pairs=(tensor([0, 0, 1, 1, 1, 1, 1, 1]),
[3], tensor([4, 4, 1, 4, 0, 1, 1, 5])),
[3]]), negative_dsts=tensor([[0, 0],
negative_node_pairs=(tensor([0, 1, 1, 1]), [3, 0],
tensor([4, 4, 1, 4])), [5, 3],
negative_dsts=tensor([[0], [3, 4]]),
[0],
[3],
[0]]),
labels=None, labels=None,
input_nodes=tensor([5, 3, 1, 2, 0, 4]), input_nodes=tensor([5, 3, 1, 2, 0, 4]),
edge_features=[{}, edge_features=[{},
{}], {}],
compacted_node_pairs=(tensor([0, 1, 1, 1]), compacted_node_pairs=(tensor([0, 1, 1, 1]),
tensor([2, 3, 3, 1])), tensor([2, 3, 3, 1])),
compacted_negative_srcs=tensor([[0], compacted_negative_srcs=None,
[1], compacted_negative_dsts=tensor([[4, 4],
[1], [1, 4],
[1]]), [0, 1],
compacted_negative_dsts=tensor([[4], [1, 5]]),
[4],
[1],
[4]]),
blocks=[Block(num_src_nodes=6, num_dst_nodes=6, num_edges=2), blocks=[Block(num_src_nodes=6, num_dst_nodes=6, num_edges=2),
Block(num_src_nodes=6, num_dst_nodes=5, num_edges=1)], Block(num_src_nodes=6, num_dst_nodes=6, num_edges=2)],
)""" )"""
), ),
str( str(
"""MiniBatch(seed_nodes=None, """MiniBatch(seed_nodes=None,
sampled_subgraphs=[SampledSubgraphImpl(sampled_csc=CSCFormatBase(indptr=tensor([0, 0, 0, 0, 1, 2]), sampled_subgraphs=[SampledSubgraphImpl(sampled_csc=CSCFormatBase(indptr=tensor([0, 0, 0, 0, 1, 2, 3]),
indices=tensor([1, 3]), indices=tensor([4, 1, 0]),
), ),
original_row_node_ids=tensor([3, 4, 0, 5, 1]), original_row_node_ids=tensor([3, 4, 0, 1, 5, 2]),
original_edge_ids=None, original_edge_ids=None,
original_column_node_ids=tensor([3, 4, 0, 5, 1]), original_column_node_ids=tensor([3, 4, 0, 1, 5, 2]),
), ),
SampledSubgraphImpl(sampled_csc=CSCFormatBase(indptr=tensor([0, 0, 0, 0, 1, 2]), SampledSubgraphImpl(sampled_csc=CSCFormatBase(indptr=tensor([0, 0, 0, 0, 1, 2, 3]),
indices=tensor([1, 3]), indices=tensor([4, 4, 0]),
), ),
original_row_node_ids=tensor([3, 4, 0, 5, 1]), original_row_node_ids=tensor([3, 4, 0, 1, 5, 2]),
original_edge_ids=None, original_edge_ids=None,
original_column_node_ids=tensor([3, 4, 0, 5, 1]), original_column_node_ids=tensor([3, 4, 0, 1, 5, 2]),
)], )],
positive_node_pairs=(tensor([0, 1, 1, 2]), positive_node_pairs=(tensor([0, 1, 1, 2]),
tensor([0, 0, 1, 1])), tensor([0, 0, 1, 1])),
node_pairs_with_labels=((tensor([0, 1, 1, 2, 0, 1, 1, 2]), tensor([0, 0, 1, 1, 1, 1, 3, 4])), node_pairs_with_labels=((tensor([0, 1, 1, 2, 0, 0, 1, 1, 1, 1, 2, 2]), tensor([0, 0, 1, 1, 3, 4, 5, 4, 1, 0, 3, 4])),
tensor([1., 1., 1., 1., 0., 0., 0., 0.])), tensor([1., 1., 1., 1., 0., 0., 0., 0., 0., 0., 0., 0.])),
node_pairs=(tensor([3, 4, 4, 0]), node_pairs=(tensor([3, 4, 4, 0]),
tensor([3, 3, 4, 4])), tensor([3, 3, 4, 4])),
node_features={'feat': tensor([[0.8672, 0.2276], node_features={'feat': tensor([[0.8672, 0.2276],
[0.5503, 0.8223], [0.5503, 0.8223],
[0.9634, 0.2294], [0.9634, 0.2294],
[0.6172, 0.7865],
[0.5160, 0.2486], [0.5160, 0.2486],
[0.6172, 0.7865]])}, [0.2109, 0.1089]])},
negative_srcs=tensor([[3], negative_srcs=None,
[4], negative_node_pairs=(tensor([0, 0, 1, 1, 1, 1, 2, 2]),
[4], tensor([3, 4, 5, 4, 1, 0, 3, 4])),
[0]]), negative_dsts=tensor([[1, 5],
negative_node_pairs=(tensor([0, 1, 1, 2]), [2, 5],
tensor([1, 1, 3, 4])), [4, 3],
negative_dsts=tensor([[4], [1, 5]]),
[4],
[5],
[1]]),
labels=None, labels=None,
input_nodes=tensor([3, 4, 0, 5, 1]), input_nodes=tensor([3, 4, 0, 1, 5, 2]),
edge_features=[{}, edge_features=[{},
{}], {}],
compacted_node_pairs=(tensor([0, 1, 1, 2]), compacted_node_pairs=(tensor([0, 1, 1, 2]),
tensor([0, 0, 1, 1])), tensor([0, 0, 1, 1])),
compacted_negative_srcs=tensor([[0], compacted_negative_srcs=None,
[1], compacted_negative_dsts=tensor([[3, 4],
[1], [5, 4],
[2]]), [1, 0],
compacted_negative_dsts=tensor([[1], [3, 4]]),
[1], blocks=[Block(num_src_nodes=6, num_dst_nodes=6, num_edges=3),
[3], Block(num_src_nodes=6, num_dst_nodes=6, num_edges=3)],
[4]]),
blocks=[Block(num_src_nodes=5, num_dst_nodes=5, num_edges=2),
Block(num_src_nodes=5, num_dst_nodes=5, num_edges=2)],
)""" )"""
), ),
str( str(
"""MiniBatch(seed_nodes=None, """MiniBatch(seed_nodes=None,
sampled_subgraphs=[SampledSubgraphImpl(sampled_csc=CSCFormatBase(indptr=tensor([0, 0, 1]), sampled_subgraphs=[SampledSubgraphImpl(sampled_csc=CSCFormatBase(indptr=tensor([0, 0, 1, 1, 2]),
indices=tensor([1]), indices=tensor([1, 0]),
), ),
original_row_node_ids=tensor([5, 4]), original_row_node_ids=tensor([5, 4, 0, 1]),
original_edge_ids=None, original_edge_ids=None,
original_column_node_ids=tensor([5, 4]), original_column_node_ids=tensor([5, 4, 0, 1]),
), ),
SampledSubgraphImpl(sampled_csc=CSCFormatBase(indptr=tensor([0, 0, 1]), SampledSubgraphImpl(sampled_csc=CSCFormatBase(indptr=tensor([0, 0, 1, 1, 2]),
indices=tensor([1]), indices=tensor([1, 0]),
), ),
original_row_node_ids=tensor([5, 4]), original_row_node_ids=tensor([5, 4, 0, 1]),
original_edge_ids=None, original_edge_ids=None,
original_column_node_ids=tensor([5, 4]), original_column_node_ids=tensor([5, 4, 0, 1]),
)], )],
positive_node_pairs=(tensor([0, 1]), positive_node_pairs=(tensor([0, 1]),
tensor([0, 0])), tensor([0, 0])),
node_pairs_with_labels=((tensor([0, 1, 0, 1]), tensor([0, 0, 0, 0])), node_pairs_with_labels=((tensor([0, 1, 0, 0, 1, 1]), tensor([0, 0, 2, 1, 2, 3])),
tensor([1., 1., 0., 0.])), tensor([1., 1., 0., 0., 0., 0.])),
node_pairs=(tensor([5, 4]), node_pairs=(tensor([5, 4]),
tensor([5, 5])), tensor([5, 5])),
node_features={'feat': tensor([[0.5160, 0.2486], node_features={'feat': tensor([[0.5160, 0.2486],
[0.5503, 0.8223]])}, [0.5503, 0.8223],
negative_srcs=tensor([[5], [0.9634, 0.2294],
[4]]), [0.6172, 0.7865]])},
negative_node_pairs=(tensor([0, 1]), negative_srcs=None,
tensor([0, 0])), negative_node_pairs=(tensor([0, 0, 1, 1]),
negative_dsts=tensor([[5], tensor([2, 1, 2, 3])),
[5]]), negative_dsts=tensor([[0, 4],
[0, 1]]),
labels=None, labels=None,
input_nodes=tensor([5, 4]), input_nodes=tensor([5, 4, 0, 1]),
edge_features=[{}, edge_features=[{},
{}], {}],
compacted_node_pairs=(tensor([0, 1]), compacted_node_pairs=(tensor([0, 1]),
tensor([0, 0])), tensor([0, 0])),
compacted_negative_srcs=tensor([[0], compacted_negative_srcs=None,
[1]]), compacted_negative_dsts=tensor([[2, 1],
compacted_negative_dsts=tensor([[0], [2, 3]]),
[0]]), blocks=[Block(num_src_nodes=4, num_dst_nodes=4, num_edges=2),
blocks=[Block(num_src_nodes=2, num_dst_nodes=2, num_edges=1), Block(num_src_nodes=4, num_dst_nodes=4, num_edges=2)],
Block(num_src_nodes=2, num_dst_nodes=2, num_edges=1)],
)""" )"""
), ),
] ]
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment