"...community/stable_diffusion_xl_controlnet_reference.py" did not exist on "a5fc62f81957c739b1d4a8fd99bf551a2949dc3c"
test_propagate.py 2.73 KB
Newer Older
1
2
import dgl
import networkx as nx
3
import backend as F
4
import unittest
5
import utils as U
6
from utils import parametrize_dtype
7
8
9
10
11

def mfunc(edges):
    return {'m' : edges.src['x']}

def rfunc(nodes):
12
    msg = F.sum(nodes.mailbox['m'], 1)
13
14
    return {'x' : nodes.data['x'] + msg}

15
16
17
18
@unittest.skipIf(F._default_context_str == 'gpu', reason="GPU not implemented")
@parametrize_dtype
def test_prop_nodes_bfs(idtype):
    g = dgl.graph(nx.path_graph(5), idtype=idtype, device=F.ctx())
19
    g.ndata['x'] = F.ones((5, 2))
20
    dgl.prop_nodes_bfs(g, 0, message_func=mfunc, reduce_func=rfunc, apply_node_func=None)
21
    # pull nodes using bfs order will result in a cumsum[i] + data[i] + data[i+1]
22
23
    assert F.allclose(g.ndata['x'],
            F.tensor([[2., 2.], [4., 4.], [6., 6.], [8., 8.], [9., 9.]]))
24

25
26
27
28
@unittest.skipIf(F._default_context_str == 'gpu', reason="GPU not implemented")
@parametrize_dtype
def test_prop_edges_dfs(idtype):
    g = dgl.graph(nx.path_graph(5), idtype=idtype, device=F.ctx())
29
    g.ndata['x'] = F.ones((5, 2))
30
    dgl.prop_edges_dfs(g, 0, message_func=mfunc, reduce_func=rfunc, apply_node_func=None)
31
    # snr using dfs results in a cumsum
32
33
    assert F.allclose(g.ndata['x'],
            F.tensor([[1., 1.], [2., 2.], [3., 3.], [4., 4.], [5., 5.]]))
34

35
    g.ndata['x'] = F.ones((5, 2))
36
    dgl.prop_edges_dfs(g, 0, has_reverse_edge=True, message_func=mfunc, reduce_func=rfunc, apply_node_func=None)
37
    # result is cumsum[i] + cumsum[i-1]
38
39
    assert F.allclose(g.ndata['x'],
            F.tensor([[1., 1.], [3., 3.], [5., 5.], [7., 7.], [9., 9.]]))
40

41
    g.ndata['x'] = F.ones((5, 2))
42
    dgl.prop_edges_dfs(g, 0, has_nontree_edge=True, message_func=mfunc, reduce_func=rfunc, apply_node_func=None)
43
    # result is cumsum[i] + cumsum[i+1]
44
45
    assert F.allclose(g.ndata['x'],
            F.tensor([[3., 3.], [5., 5.], [7., 7.], [9., 9.], [5., 5.]]))
46

47
48
49
@unittest.skipIf(F._default_context_str == 'gpu', reason="GPU not implemented")
@parametrize_dtype
def test_prop_nodes_topo(idtype):
50
    # bi-directional chain
51
    g = dgl.graph(nx.path_graph(5), idtype=idtype, device=F.ctx())
52
53
54
55
56
57
58
59
60
    assert U.check_fail(dgl.prop_nodes_topo, g)  # has loop

    # tree
    tree = dgl.DGLGraph()
    tree.add_nodes(5)
    tree.add_edge(1, 0)
    tree.add_edge(2, 0)
    tree.add_edge(3, 2)
    tree.add_edge(4, 2)
61
    tree = dgl.graph(tree.edges())
62
    # init node feature data
63
    tree.ndata['x'] = F.zeros((5, 2))
64
    # set all leaf nodes to be ones
65
    tree.nodes[[1, 3, 4]].data['x'] = F.ones((3, 2))
66
    dgl.prop_nodes_topo(tree, message_func=mfunc, reduce_func=rfunc, apply_node_func=None)
67
    # root node get the sum
68
    assert F.allclose(tree.nodes[0].data['x'], F.tensor([[3., 3.]]))
69
70
71
72
73

if __name__ == '__main__':
    test_prop_nodes_bfs()
    test_prop_edges_dfs()
    test_prop_nodes_topo()