test_subgraph_sampler.py 5.94 KB
Newer Older
1
import dgl.graphbolt as gb
2
3
4
5
import gb_test_utils
import pytest
import torch
import torchdata.datapipes as dp
6
from torchdata.datapipes.iter import Mapper
7
8


9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
def test_SubgraphSampler_invoke():
    itemset = gb.ItemSet(torch.arange(10), names="seed_nodes")
    datapipe = gb.ItemSampler(itemset, batch_size=2)

    # Invoke via class constructor.
    datapipe = gb.SubgraphSampler(datapipe)
    with pytest.raises(NotImplementedError):
        next(iter(datapipe))

    # Invokde via functional form.
    datapipe = datapipe.sample_subgraph()
    with pytest.raises(NotImplementedError):
        next(iter(datapipe))


@pytest.mark.parametrize("labor", [False, True])
def test_NeighborSampler_invoke(labor):
    graph = gb_test_utils.rand_csc_graph(20, 0.15)
    itemset = gb.ItemSet(torch.arange(10), names="seed_nodes")
    datapipe = gb.ItemSampler(itemset, batch_size=2)
    num_layer = 2
    fanouts = [torch.LongTensor([2]) for _ in range(num_layer)]

    # Invoke via class constructor.
    Sampler = gb.LayerNeighborSampler if labor else gb.NeighborSampler
    datapipe = Sampler(datapipe, graph, fanouts)
    assert len(list(datapipe)) == 5

    # Invokde via functional form.
    if labor:
        datapipe = datapipe.sample_layer_neighbor(graph, fanouts)
    else:
        datapipe = datapipe.sample_neighbor(graph, fanouts)
    assert len(list(datapipe)) == 5


45
46
@pytest.mark.parametrize("labor", [False, True])
def test_SubgraphSampler_Node(labor):
47
    graph = gb_test_utils.rand_csc_graph(20, 0.15)
48
49
    itemset = gb.ItemSet(torch.arange(10), names="seed_nodes")
    item_sampler = gb.ItemSampler(itemset, batch_size=2)
50
51
    num_layer = 2
    fanouts = [torch.LongTensor([2]) for _ in range(num_layer)]
52
    Sampler = gb.LayerNeighborSampler if labor else gb.NeighborSampler
53
    sampler_dp = Sampler(item_sampler, graph, fanouts)
54
    assert len(list(sampler_dp)) == 5
55
56


57
def to_link_batch(data):
58
    block = gb.MiniBatch(node_pairs=data)
59
    return block
60
61


62
63
@pytest.mark.parametrize("labor", [False, True])
def test_SubgraphSampler_Link(labor):
64
    graph = gb_test_utils.rand_csc_graph(20, 0.15)
65
66
    itemset = gb.ItemSet(torch.arange(0, 20).reshape(-1, 2), names="node_pairs")
    item_sampler = gb.ItemSampler(itemset, batch_size=2)
67
68
    num_layer = 2
    fanouts = [torch.LongTensor([2]) for _ in range(num_layer)]
69
    Sampler = gb.LayerNeighborSampler if labor else gb.NeighborSampler
70
    neighbor_dp = Sampler(item_sampler, graph, fanouts)
71
    assert len(list(neighbor_dp)) == 5
72
73


74
75
76
77
78
79
80
81
82
@pytest.mark.parametrize(
    "format",
    [
        gb.LinkPredictionEdgeFormat.INDEPENDENT,
        gb.LinkPredictionEdgeFormat.CONDITIONED,
        gb.LinkPredictionEdgeFormat.HEAD_CONDITIONED,
        gb.LinkPredictionEdgeFormat.TAIL_CONDITIONED,
    ],
)
83
84
@pytest.mark.parametrize("labor", [False, True])
def test_SubgraphSampler_Link_With_Negative(format, labor):
85
    graph = gb_test_utils.rand_csc_graph(20, 0.15)
86
87
    itemset = gb.ItemSet(torch.arange(0, 20).reshape(-1, 2), names="node_pairs")
    item_sampler = gb.ItemSampler(itemset, batch_size=2)
88
89
    num_layer = 2
    fanouts = [torch.LongTensor([2]) for _ in range(num_layer)]
90
    negative_dp = gb.UniformNegativeSampler(item_sampler, 1, format, graph)
91
92
    Sampler = gb.LayerNeighborSampler if labor else gb.NeighborSampler
    neighbor_dp = Sampler(negative_dp, graph, fanouts)
93
    assert len(list(neighbor_dp)) == 5
94
95


96
97
98
99
100
101
102
def get_hetero_graph():
    # COO graph:
    # [0, 0, 1, 1, 2, 2, 3, 3, 4, 4]
    # [2, 4, 2, 3, 0, 1, 1, 0, 0, 1]
    # [1, 1, 1, 1, 0, 0, 0, 0, 0] - > edge type.
    # num_nodes = 5, num_n1 = 2, num_n2 = 3
    ntypes = {"n1": 0, "n2": 1}
103
    etypes = {"n1:e1:n2": 0, "n2:e2:n1": 1}
104
105
106
107
108
109
110
111
112
113
114
115
    metadata = gb.GraphMetadata(ntypes, etypes)
    indptr = torch.LongTensor([0, 2, 4, 6, 8, 10])
    indices = torch.LongTensor([2, 4, 2, 3, 0, 1, 1, 0, 0, 1])
    type_per_edge = torch.LongTensor([1, 1, 1, 1, 0, 0, 0, 0, 0, 0])
    node_type_offset = torch.LongTensor([0, 2, 5])
    return gb.from_csc(
        indptr,
        indices,
        node_type_offset=node_type_offset,
        type_per_edge=type_per_edge,
        metadata=metadata,
    )
116
117


118
119
@pytest.mark.parametrize("labor", [False, True])
def test_SubgraphSampler_Link_Hetero(labor):
120
121
122
    graph = get_hetero_graph()
    itemset = gb.ItemSetDict(
        {
123
            "n1:e1:n2": gb.ItemSet(
124
125
                torch.LongTensor([[0, 0, 1, 1], [0, 2, 0, 1]]).T,
                names="node_pairs",
126
            ),
127
            "n2:e2:n1": gb.ItemSet(
128
129
                torch.LongTensor([[0, 0, 1, 1, 2, 2], [0, 1, 1, 0, 0, 1]]).T,
                names="node_pairs",
130
131
132
            ),
        }
    )
133

134
    item_sampler = gb.ItemSampler(itemset, batch_size=2)
135
136
    num_layer = 2
    fanouts = [torch.LongTensor([2]) for _ in range(num_layer)]
137
    Sampler = gb.LayerNeighborSampler if labor else gb.NeighborSampler
138
    neighbor_dp = Sampler(item_sampler, graph, fanouts)
139
    assert len(list(neighbor_dp)) == 5
140
141
142


@pytest.mark.parametrize(
143
144
145
146
147
148
149
    "format",
    [
        gb.LinkPredictionEdgeFormat.INDEPENDENT,
        gb.LinkPredictionEdgeFormat.CONDITIONED,
        gb.LinkPredictionEdgeFormat.HEAD_CONDITIONED,
        gb.LinkPredictionEdgeFormat.TAIL_CONDITIONED,
    ],
150
)
151
152
@pytest.mark.parametrize("labor", [False, True])
def test_SubgraphSampler_Link_Hetero_With_Negative(format, labor):
153
154
155
    graph = get_hetero_graph()
    itemset = gb.ItemSetDict(
        {
156
            "n1:e1:n2": gb.ItemSet(
157
158
                torch.LongTensor([[0, 0, 1, 1], [0, 2, 0, 1]]).T,
                names="node_pairs",
159
            ),
160
            "n2:e2:n1": gb.ItemSet(
161
162
                torch.LongTensor([[0, 0, 1, 1, 2, 2], [0, 1, 1, 0, 0, 1]]).T,
                names="node_pairs",
163
164
165
166
            ),
        }
    )

167
    item_sampler = gb.ItemSampler(itemset, batch_size=2)
168
169
    num_layer = 2
    fanouts = [torch.LongTensor([2]) for _ in range(num_layer)]
170
    negative_dp = gb.UniformNegativeSampler(item_sampler, 1, format, graph)
171
172
    Sampler = gb.LayerNeighborSampler if labor else gb.NeighborSampler
    neighbor_dp = Sampler(negative_dp, graph, fanouts)
173
    assert len(list(neighbor_dp)) == 5