test_propagate.py 2.73 KB
Newer Older
1
2
import dgl
import networkx as nx
3
import backend as F
4
import unittest
5
import utils as U
6
from utils import parametrize_dtype
7

8
9
10
11
def create_graph(idtype):
    g = dgl.from_networkx(nx.path_graph(5), idtype=idtype, device=F.ctx())
    return g

12
13
14
15
def mfunc(edges):
    return {'m' : edges.src['x']}

def rfunc(nodes):
16
    msg = F.sum(nodes.mailbox['m'], 1)
17
18
    return {'x' : nodes.data['x'] + msg}

19
20
21
@unittest.skipIf(F._default_context_str == 'gpu', reason="GPU not implemented")
@parametrize_dtype
def test_prop_nodes_bfs(idtype):
22
    g = create_graph(idtype)
23
    g.ndata['x'] = F.ones((5, 2))
24
    dgl.prop_nodes_bfs(g, 0, message_func=mfunc, reduce_func=rfunc, apply_node_func=None)
25
    # pull nodes using bfs order will result in a cumsum[i] + data[i] + data[i+1]
26
27
    assert F.allclose(g.ndata['x'],
            F.tensor([[2., 2.], [4., 4.], [6., 6.], [8., 8.], [9., 9.]]))
28

29
30
31
@unittest.skipIf(F._default_context_str == 'gpu', reason="GPU not implemented")
@parametrize_dtype
def test_prop_edges_dfs(idtype):
32
    g = create_graph(idtype)
33
    g.ndata['x'] = F.ones((5, 2))
34
    dgl.prop_edges_dfs(g, 0, message_func=mfunc, reduce_func=rfunc, apply_node_func=None)
35
    # snr using dfs results in a cumsum
36
37
    assert F.allclose(g.ndata['x'],
            F.tensor([[1., 1.], [2., 2.], [3., 3.], [4., 4.], [5., 5.]]))
38

39
    g.ndata['x'] = F.ones((5, 2))
40
    dgl.prop_edges_dfs(g, 0, has_reverse_edge=True, message_func=mfunc, reduce_func=rfunc, apply_node_func=None)
41
    # result is cumsum[i] + cumsum[i-1]
42
43
    assert F.allclose(g.ndata['x'],
            F.tensor([[1., 1.], [3., 3.], [5., 5.], [7., 7.], [9., 9.]]))
44

45
    g.ndata['x'] = F.ones((5, 2))
46
    dgl.prop_edges_dfs(g, 0, has_nontree_edge=True, message_func=mfunc, reduce_func=rfunc, apply_node_func=None)
47
    # result is cumsum[i] + cumsum[i+1]
48
49
    assert F.allclose(g.ndata['x'],
            F.tensor([[3., 3.], [5., 5.], [7., 7.], [9., 9.], [5., 5.]]))
50

51
52
53
@unittest.skipIf(F._default_context_str == 'gpu', reason="GPU not implemented")
@parametrize_dtype
def test_prop_nodes_topo(idtype):
54
    # bi-directional chain
55
    g = create_graph(idtype)
56
57
58
59
60
61
62
63
64
    assert U.check_fail(dgl.prop_nodes_topo, g)  # has loop

    # tree
    tree = dgl.DGLGraph()
    tree.add_nodes(5)
    tree.add_edge(1, 0)
    tree.add_edge(2, 0)
    tree.add_edge(3, 2)
    tree.add_edge(4, 2)
65
    tree = dgl.graph(tree.edges())
66
    # init node feature data
67
    tree.ndata['x'] = F.zeros((5, 2))
68
    # set all leaf nodes to be ones
69
    tree.nodes[[1, 3, 4]].data['x'] = F.ones((3, 2))
70
    dgl.prop_nodes_topo(tree, message_func=mfunc, reduce_func=rfunc, apply_node_func=None)
71
    # root node get the sum
72
    assert F.allclose(tree.nodes[0].data['x'], F.tensor([[3., 3.]]))
73
74
75
76
77

if __name__ == '__main__':
    test_prop_nodes_bfs()
    test_prop_edges_dfs()
    test_prop_nodes_topo()