Unverified Commit 7c2ea23a authored by peizhou001's avatar peizhou001 Committed by GitHub
Browse files

[Graphbolt] Change input/output of sample neighbors to hetero format (#5980)


Co-authored-by: default avatarUbuntu <ubuntu@ip-172-31-16-19.ap-northeast-1.compute.internal>
parent f7f4e73a
...@@ -3,13 +3,15 @@ ...@@ -3,13 +3,15 @@
import os import os
import tarfile import tarfile
import tempfile import tempfile
from typing import Dict, Optional, Tuple from collections import defaultdict
from typing import Dict, Optional, Tuple, Union
import torch import torch
from ...base import ETYPE from ...base import ETYPE
from ...convert import to_homogeneous from ...convert import to_homogeneous
from ...heterograph import DGLGraph from ...heterograph import DGLGraph
from .sampled_subgraph_impl import SampledSubgraphImpl
class GraphMetadata: class GraphMetadata:
...@@ -213,7 +215,125 @@ class CSCSamplingGraph: ...@@ -213,7 +215,125 @@ class CSCSamplingGraph:
# TODO: change the result to 'SampledSubgraphImpl'. # TODO: change the result to 'SampledSubgraphImpl'.
return self._c_csc_graph.in_subgraph(nodes) return self._c_csc_graph.in_subgraph(nodes)
def _convert_to_sampled_subgraph(
self,
C_sampled_subgraph: torch.ScriptObject,
):
"""An internal function used to convert a fused homogeneous sampled
subgraph to general struct 'SampledSubgraphImpl'."""
column_num = (
C_sampled_subgraph.indptr[1:] - C_sampled_subgraph.indptr[:-1]
)
column = C_sampled_subgraph.reverse_column_node_ids.repeat_interleave(
column_num
)
row = C_sampled_subgraph.indices
type_per_edge = C_sampled_subgraph.type_per_edge
if type_per_edge is None:
# The sampled graph is already a homogeneous graph.
node_pairs = (row, column)
else:
# The sampled graph is a fused homogenized graph, which need to be
# converted to heterogeneous graphs.
node_pairs = defaultdict(list)
for etype, etype_id in self.metadata.edge_type_to_id.items():
src_ntype, _, dst_ntype = etype
src_ntype_id = self.metadata.node_type_to_id[src_ntype]
dst_ntype_id = self.metadata.node_type_to_id[dst_ntype]
mask = type_per_edge == etype_id
hetero_row = row[mask] - self.node_type_offset[src_ntype_id]
hetero_column = (
column[mask] - self.node_type_offset[dst_ntype_id]
)
node_pairs[etype] = (hetero_row, hetero_column)
return SampledSubgraphImpl(node_pairs=node_pairs)
def sample_neighbors( def sample_neighbors(
self,
nodes: Union[torch.Tensor, Dict[str, torch.Tensor]],
fanouts: torch.Tensor,
replace: bool = False,
probs_name: Optional[str] = None,
) -> SampledSubgraphImpl:
"""Sample neighboring edges of the given nodes and return the induced
subgraph.
Parameters
----------
nodes: torch.Tensor or Dict[str, torch.Tensor]
IDs of the given seed nodes.
- If `nodes` is a tensor: It means the graph is homogeneous
graph, and ids inside are homogeneous ids.
- If `nodes` is a dictionary: The keys should be node type and
ids inside are heterogeneous ids.
fanouts: torch.Tensor
The number of edges to be sampled for each node with or without
considering edge types.
- When the length is 1, it indicates that the fanout applies to
all neighbors of the node as a collective, regardless of the
edge type.
- Otherwise, the length should equal to the number of edge
types, and each fanout value corresponds to a specific edge
type of the nodes.
The value of each fanout should be >= 0 or = -1.
- When the value is -1, all neighbors will be chosen for
sampling. It is equivalent to selecting all neighbors when
the fanout is >= the number of neighbors (and replace is set to
false).
- When the value is a non-negative integer, it serves as a
minimum threshold for selecting neighbors.
replace: bool
Boolean indicating whether the sample is preformed with or
without replacement. If True, a value can be selected multiple
times. Otherwise, each value can be selected only once.
probs_name: str, optional
An optional string specifying the name of an edge attribute used a. This
attribute tensor should contain (unnormalized) probabilities
corresponding to each neighboring edge of a node. It must be a 1D
floating-point or boolean tensor, with the number of elements
equalling the total number of edges.
Returns
-------
SampledSubgraphImpl
The sampled subgraph.
Examples
--------
>>> import dgl.graphbolt as gb
>>> ntypes = {'n1': 0, 'n2': 1, 'n3': 2}
>>> etypes = {('n1', 'e1', 'n2'): 0, ('n1', 'e2', 'n3'): 1}
>>> metadata = gb.GraphMetadata(ntypes, etypes)
>>> indptr = torch.LongTensor([0, 3, 4, 5, 7])
>>> indices = torch.LongTensor([0, 1, 3, 2, 3, 0, 1])
>>> node_type_offset = torch.LongTensor([0, 2, 3, 4])
>>> type_per_edge = torch.LongTensor([0, 0, 1, 0, 1, 0, 1])
>>> graph = gb.from_csc(indptr, indices, type_per_edge=type_per_edge,
... node_type_offset=node_type_offset, metadata=metadata)
>>> nodes = {'n1': torch.LongTensor([1]), 'n2': torch.LongTensor([0])}
>>> fanouts = torch.tensor([1, 1])
>>> subgraph = graph.sample_neighbors(nodes, fanouts)
>>> print(subgraph.node_pairs)
defaultdict(<class 'list'>, {('n1', 'e1', 'n2'): (tensor([2]), \
tensor([1])), ('n1', 'e2', 'n3'): (tensor([3]), tensor([2]))})
"""
def convert_to_homogeneous_nodes(nodes):
homogeneous_nodes = []
for ntype, ids in nodes.items():
ntype_id = self.metadata.node_type_to_id[ntype]
homogeneous_nodes.append(ids + self.node_type_offset[ntype_id])
return torch.cat(homogeneous_nodes)
if isinstance(nodes, dict):
nodes = convert_to_homogeneous_nodes(nodes)
C_sampled_subgraph = self._sample_neighbors(
nodes, fanouts, replace, False, probs_name
)
return self._convert_to_sampled_subgraph(C_sampled_subgraph)
def _sample_neighbors(
self, self,
nodes: torch.Tensor, nodes: torch.Tensor,
fanouts: torch.Tensor, fanouts: torch.Tensor,
...@@ -261,27 +381,7 @@ class CSCSamplingGraph: ...@@ -261,27 +381,7 @@ class CSCSamplingGraph:
Returns Returns
------- -------
torch.classes.graphbolt.SampledSubgraph torch.classes.graphbolt.SampledSubgraph
The sampled subgraph. The sampled C subgraph.
Examples
--------
>>> indptr = torch.LongTensor([0, 3, 5, 7])
>>> indices = torch.LongTensor([0, 1, 4, 2, 3, 0, 1])
>>> type_per_edge = torch.LongTensor([0, 0, 1, 0, 1, 0, 1])
>>> graph = gb.from_csc(indptr, indices, type_per_edge=type_per_edge)
>>> nodes = torch.LongTensor([1, 2])
>>> fanouts = torch.tensor([1, 1])
>>> subgraph = graph.sample_neighbors(nodes, fanouts, return_eids=True)
>>> print(subgraph.indptr)
tensor([0, 2, 4])
>>> print(subgraph.indices)
tensor([2, 3, 0, 1])
>>> print(subgraph.reverse_column_node_ids)
tensor([1, 2])
>>> print(subgraph.reverse_edge_ids)
tensor([3, 4, 5, 6])
>>> print(subgraph.type_per_edge)
tensor([0, 1, 0, 1])
""" """
# Ensure nodes is 1-D tensor. # Ensure nodes is 1-D tensor.
assert nodes.dim() == 1, "Nodes should be 1-D tensor." assert nodes.dim() == 1, "Nodes should be 1-D tensor."
......
...@@ -366,7 +366,7 @@ def test_in_subgraph_heterogeneous(): ...@@ -366,7 +366,7 @@ def test_in_subgraph_heterogeneous():
F._default_context_str == "gpu", F._default_context_str == "gpu",
reason="Graph is CPU only at present.", reason="Graph is CPU only at present.",
) )
def test_sample_neighbors(): def test_sample_neighbors_homo():
"""Original graph in COO: """Original graph in COO:
1 0 1 0 1 1 0 1 0 1
1 0 1 1 0 1 0 1 1 0
...@@ -379,36 +379,83 @@ def test_sample_neighbors(): ...@@ -379,36 +379,83 @@ def test_sample_neighbors():
num_edges = 12 num_edges = 12
indptr = torch.LongTensor([0, 3, 5, 7, 9, 12]) indptr = torch.LongTensor([0, 3, 5, 7, 9, 12])
indices = torch.LongTensor([0, 1, 4, 2, 3, 0, 1, 1, 2, 0, 3, 4]) indices = torch.LongTensor([0, 1, 4, 2, 3, 0, 1, 1, 2, 0, 3, 4])
type_per_edge = torch.LongTensor([0, 0, 1, 0, 1, 0, 1, 0, 1, 0, 0, 1])
assert indptr[-1] == num_edges assert indptr[-1] == num_edges
assert indptr[-1] == len(indices) assert indptr[-1] == len(indices)
ntypes = {"n1": 0, "n2": 1, "n3": 2}
etypes = {("n1", "e1", "n2"): 0, ("n1", "e2", "n3"): 1} # Construct CSCSamplingGraph.
graph = gb.from_csc(indptr, indices)
# Generate subgraph via sample neighbors.
nodes = torch.LongTensor([1, 3, 4])
subgraph = graph.sample_neighbors(nodes, fanouts=torch.LongTensor([2]))
# Verify in subgraph.
sampled_num = subgraph.node_pairs[0].size(0)
assert sampled_num == 6
assert subgraph.reverse_column_node_ids is None
assert subgraph.reverse_row_node_ids is None
assert subgraph.reverse_edge_ids is None
@unittest.skipIf(
F._default_context_str == "gpu",
reason="Graph is CPU only at present.",
)
def test_sample_neighbors_hetero():
"""Original graph in COO:
("n1", "e1", "n2"):[0, 0, 1, 1, 1], [0, 2, 0, 1, 2]
("n2", "e2", "n1"):[0, 0, 1, 2], [0, 1, 1 ,0]
0 0 1 0 1
0 0 1 1 1
1 1 0 0 0
0 1 0 0 0
1 0 0 0 0
"""
# Initialize data.
ntypes = {"n1": 0, "n2": 1}
etypes = {("n1", "e1", "n2"): 0, ("n2", "e2", "n1"): 1}
metadata = gb.GraphMetadata(ntypes, etypes) metadata = gb.GraphMetadata(ntypes, etypes)
num_nodes = 5
num_edges = 9
indptr = torch.LongTensor([0, 2, 4, 6, 7, 9])
indices = torch.LongTensor([2, 4, 2, 3, 0, 1, 1, 0, 1])
type_per_edge = torch.LongTensor([1, 1, 1, 1, 0, 0, 0, 0, 0])
node_type_offset = torch.LongTensor([0, 2, 5])
assert indptr[-1] == num_edges
assert indptr[-1] == len(indices)
# Construct CSCSamplingGraph. # Construct CSCSamplingGraph.
graph = gb.from_csc( graph = gb.from_csc(
indptr, indices, type_per_edge=type_per_edge, metadata=metadata indptr,
indices,
node_type_offset=node_type_offset,
type_per_edge=type_per_edge,
metadata=metadata,
) )
# Generate subgraph via sample neighbors. # Generate subgraph via sample neighbors.
nodes = torch.LongTensor([1, 3, 4]) nodes = {"n1": torch.LongTensor([0]), "n2": torch.LongTensor([0])}
fanouts = torch.tensor([2, 2]) fanouts = torch.tensor([-1, -1])
subgraph = graph.sample_neighbors(nodes, fanouts, return_eids=True) subgraph = graph.sample_neighbors(nodes, fanouts)
# Verify in subgraph. # Verify in subgraph.
assert torch.equal(subgraph.indptr, torch.LongTensor([0, 2, 4, 7])) expected_node_pairs = {
assert torch.equal( ("n1", "e1", "n2"): (
torch.sort(subgraph.indices)[0], torch.LongTensor([0, 1]),
torch.sort(torch.LongTensor([2, 3, 1, 2, 0, 3, 4]))[0], torch.LongTensor([0, 0]),
) ),
assert torch.equal(subgraph.reverse_column_node_ids, nodes) ("n2", "e2", "n1"): (
assert torch.equal( torch.LongTensor([0, 2]),
subgraph.reverse_edge_ids, torch.LongTensor([3, 4, 7, 8, 9, 10, 11]) torch.LongTensor([0, 0]),
) ),
assert torch.equal( }
subgraph.type_per_edge, torch.LongTensor([0, 1, 0, 1, 0, 0, 1]) assert len(subgraph.node_pairs) == 2
) for etype, pairs in expected_node_pairs.items():
assert torch.equal(subgraph.node_pairs[etype][0], pairs[0])
assert torch.equal(subgraph.node_pairs[etype][1], pairs[1])
assert subgraph.reverse_column_node_ids is None
assert subgraph.reverse_row_node_ids is None assert subgraph.reverse_row_node_ids is None
assert subgraph.reverse_edge_ids is None
@unittest.skipIf( @unittest.skipIf(
...@@ -416,53 +463,68 @@ def test_sample_neighbors(): ...@@ -416,53 +463,68 @@ def test_sample_neighbors():
reason="Graph is CPU only at present.", reason="Graph is CPU only at present.",
) )
@pytest.mark.parametrize( @pytest.mark.parametrize(
"fanouts, expected_sampled_num", "fanouts, expected_sampled_num1, expected_sampled_num2",
[ [
([0], 0), ([0], 0, 0),
([1], 3), ([1], 1, 1),
([2], 6), ([2], 2, 2),
([4], 7), ([4], 2, 2),
([-1], 7), ([-1], 2, 2),
([0, 0], 0), ([0, 0], 0, 0),
([1, 0], 3), ([1, 0], 1, 0),
([1, 1], 6), ([0, 1], 0, 1),
([2, 2], 7), ([1, 1], 1, 1),
([-1, -1], 7), ([2, 1], 2, 1),
([-1, -1], 2, 2),
], ],
) )
def test_sample_neighbors_fanouts(fanouts, expected_sampled_num): def test_sample_neighbors_fanouts(
fanouts, expected_sampled_num1, expected_sampled_num2
):
"""Original graph in COO: """Original graph in COO:
1 0 1 0 1 ("n1", "e1", "n2"):[0, 0, 1, 1, 1], [0, 2, 0, 1, 2]
1 0 1 1 0 ("n2", "e2", "n1"):[0, 0, 1, 2], [0, 1, 1 ,0]
0 1 0 1 0 0 0 1 0 1
0 1 0 0 1 0 0 1 1 1
1 0 0 0 1 1 1 0 0 0
0 1 0 0 0
1 0 0 0 0
""" """
# Initialize data. # Initialize data.
ntypes = {"n1": 0, "n2": 1}
etypes = {("n1", "e1", "n2"): 0, ("n2", "e2", "n1"): 1}
metadata = gb.GraphMetadata(ntypes, etypes)
num_nodes = 5 num_nodes = 5
num_edges = 12 num_edges = 9
indptr = torch.LongTensor([0, 3, 5, 7, 9, 12]) indptr = torch.LongTensor([0, 2, 4, 6, 7, 9])
indices = torch.LongTensor([0, 1, 4, 2, 3, 0, 1, 1, 2, 0, 3, 4]) indices = torch.LongTensor([2, 4, 2, 3, 0, 1, 1, 0, 1])
type_per_edge = torch.LongTensor([0, 0, 1, 0, 1, 0, 1, 0, 1, 0, 0, 1]) type_per_edge = torch.LongTensor([1, 1, 1, 1, 0, 0, 0, 0, 0])
node_type_offset = torch.LongTensor([0, 2, 5])
assert indptr[-1] == num_edges assert indptr[-1] == num_edges
assert indptr[-1] == len(indices) assert indptr[-1] == len(indices)
ntypes = {"n1": 0, "n2": 1, "n3": 2}
etypes = {("n1", "e1", "n2"): 0, ("n1", "e2", "n3"): 1}
metadata = gb.GraphMetadata(ntypes, etypes)
# Construct CSCSamplingGraph. # Construct CSCSamplingGraph.
graph = gb.from_csc( graph = gb.from_csc(
indptr, indices, type_per_edge=type_per_edge, metadata=metadata indptr,
indices,
node_type_offset=node_type_offset,
type_per_edge=type_per_edge,
metadata=metadata,
) )
# Generate subgraph via sample neighbors. nodes = {"n1": torch.LongTensor([0]), "n2": torch.LongTensor([0])}
nodes = torch.LongTensor([1, 3, 4])
fanouts = torch.LongTensor(fanouts) fanouts = torch.LongTensor(fanouts)
subgraph = graph.sample_neighbors(nodes, fanouts) subgraph = graph.sample_neighbors(nodes, fanouts)
# Verify in subgraph. # Verify in subgraph.
sampled_num = subgraph.indices.size(0) assert (
assert sampled_num == expected_sampled_num subgraph.node_pairs[("n1", "e1", "n2")][0].numel()
== expected_sampled_num1
)
assert (
subgraph.node_pairs[("n2", "e2", "n1")][0].numel()
== expected_sampled_num2
)
@unittest.skipIf( @unittest.skipIf(
...@@ -470,36 +532,57 @@ def test_sample_neighbors_fanouts(fanouts, expected_sampled_num): ...@@ -470,36 +532,57 @@ def test_sample_neighbors_fanouts(fanouts, expected_sampled_num):
reason="Graph is CPU only at present.", reason="Graph is CPU only at present.",
) )
@pytest.mark.parametrize( @pytest.mark.parametrize(
"replace, expected_sampled_num", [(False, 7), (True, 12)] "replace, expected_sampled_num1, expected_sampled_num2",
[(False, 2, 2), (True, 4, 4)],
) )
def test_sample_neighbors_replace(replace, expected_sampled_num): def test_sample_neighbors_replace(
replace, expected_sampled_num1, expected_sampled_num2
):
"""Original graph in COO: """Original graph in COO:
1 0 1 0 1 ("n1", "e1", "n2"):[0, 0, 1, 1, 1], [0, 2, 0, 1, 2]
1 0 1 1 0 ("n2", "e2", "n1"):[0, 0, 1, 2], [0, 1, 1 ,0]
0 1 0 1 0 0 0 1 0 1
0 1 0 0 1 0 0 1 1 1
1 0 0 0 1 1 1 0 0 0
0 1 0 0 0
1 0 0 0 0
""" """
# Initialize data. # Initialize data.
ntypes = {"n1": 0, "n2": 1}
etypes = {("n1", "e1", "n2"): 0, ("n2", "e2", "n1"): 1}
metadata = gb.GraphMetadata(ntypes, etypes)
num_nodes = 5 num_nodes = 5
num_edges = 12 num_edges = 9
indptr = torch.LongTensor([0, 3, 5, 7, 9, 12]) indptr = torch.LongTensor([0, 2, 4, 6, 7, 9])
indices = torch.LongTensor([0, 1, 4, 2, 3, 0, 1, 1, 2, 0, 3, 4]) indices = torch.LongTensor([2, 4, 2, 3, 0, 1, 1, 0, 1])
type_per_edge = torch.LongTensor([1, 1, 1, 1, 0, 0, 0, 0, 0])
node_type_offset = torch.LongTensor([0, 2, 5])
assert indptr[-1] == num_edges assert indptr[-1] == num_edges
assert indptr[-1] == len(indices) assert indptr[-1] == len(indices)
# Construct CSCSamplingGraph. # Construct CSCSamplingGraph.
graph = gb.from_csc(indptr, indices) graph = gb.from_csc(
indptr,
indices,
node_type_offset=node_type_offset,
type_per_edge=type_per_edge,
metadata=metadata,
)
# Generate subgraph via sample neighbors. nodes = {"n1": torch.LongTensor([0]), "n2": torch.LongTensor([0])}
nodes = torch.LongTensor([1, 3, 4])
subgraph = graph.sample_neighbors( subgraph = graph.sample_neighbors(
nodes, fanouts=torch.LongTensor([4]), replace=replace nodes, torch.LongTensor([4]), replace=replace
) )
# Verify in subgraph. # Verify in subgraph.
sampled_num = subgraph.indices.size(0) assert (
assert sampled_num == expected_sampled_num subgraph.node_pairs[("n1", "e1", "n2")][0].numel()
== expected_sampled_num1
)
assert (
subgraph.node_pairs[("n2", "e2", "n1")][0].numel()
== expected_sampled_num2
)
@unittest.skipIf( @unittest.skipIf(
...@@ -544,7 +627,7 @@ def test_sample_neighbors_probs(replace, probs_name): ...@@ -544,7 +627,7 @@ def test_sample_neighbors_probs(replace, probs_name):
) )
# Verify in subgraph. # Verify in subgraph.
sampled_num = subgraph.indices.size(0) sampled_num = subgraph.node_pairs[0].size(0)
if replace: if replace:
assert sampled_num == 6 assert sampled_num == 6
else: else:
...@@ -587,7 +670,7 @@ def test_sample_neighbors_zero_probs(replace, probs_or_mask): ...@@ -587,7 +670,7 @@ def test_sample_neighbors_zero_probs(replace, probs_or_mask):
) )
# Verify in subgraph. # Verify in subgraph.
sampled_num = subgraph.indices.size(0) sampled_num = subgraph.node_pairs[0].size(0)
assert sampled_num == 0 assert sampled_num == 0
......
...@@ -19,7 +19,7 @@ def test_DataLoader(): ...@@ -19,7 +19,7 @@ def test_DataLoader():
for hop in range(2): for hop in range(2):
sg = graph.sample_neighbors(seeds, torch.LongTensor([2])) sg = graph.sample_neighbors(seeds, torch.LongTensor([2]))
seeds = sg.indices seeds = sg.node_pairs[0]
adjs.insert(0, sg) adjs.insert(0, sg)
input_nodes = seeds input_nodes = seeds
......
...@@ -15,7 +15,7 @@ def get_graphbolt_sampler_func(): ...@@ -15,7 +15,7 @@ def get_graphbolt_sampler_func():
for hop in range(2): for hop in range(2):
sg = graph.sample_neighbors(seeds, torch.LongTensor([2])) sg = graph.sample_neighbors(seeds, torch.LongTensor([2]))
seeds = sg.indices seeds = sg.node_pairs[0]
adjs.insert(0, sg) adjs.insert(0, sg)
return seeds, data, adjs return seeds, data, adjs
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment