test_partition.py 5.22 KB
Newer Older
1
2
import dgl
import sys
3
import os
4
5
6
7
8
import numpy as np
from scipy import sparse as spsp
from numpy.testing import assert_array_equal
from dgl.graph_index import create_graph_index
from dgl.distributed import partition_graph, load_partition
9
from dgl import function as fn
10
11
12
import backend as F
import unittest
import pickle
13
import random
14
15

def create_random_graph(n):
Jinjing Zhou's avatar
Jinjing Zhou committed
16
    arr = (spsp.random(n, n, density=0.001, format='coo', random_state=100) != 0).astype(np.int64)
17
18
19
    ig = create_graph_index(arr, readonly=True)
    return dgl.DGLGraph(ig)

Da Zheng's avatar
Da Zheng committed
20
def check_partition(reshuffle):
21
22
23
    g = create_random_graph(10000)
    g.ndata['labels'] = F.arange(0, g.number_of_nodes())
    g.ndata['feats'] = F.tensor(np.random.randn(g.number_of_nodes(), 10))
24
25
26
    g.edata['feats'] = F.tensor(np.random.randn(g.number_of_edges(), 10))
    g.update_all(fn.copy_src('feats', 'msg'), fn.sum('msg', 'h'))
    g.update_all(fn.copy_edge('feats', 'msg'), fn.sum('msg', 'eh'))
27
28
    num_parts = 4
    num_hops = 2
Da Zheng's avatar
Da Zheng committed
29
30
31
32

    partition_graph(g, 'test', num_parts, '/tmp/partition', num_hops=num_hops,
                    part_method='metis', reshuffle=reshuffle)
    part_sizes = []
33
    for i in range(num_parts):
Da Zheng's avatar
Da Zheng committed
34
        part_g, node_feats, edge_feats, gpb = load_partition('/tmp/partition/test.json', i)
35
36

        # Check the metadata
Da Zheng's avatar
Da Zheng committed
37
38
39
40
41
42
43
44
45
46
47
        assert gpb._num_nodes() == g.number_of_nodes()
        assert gpb._num_edges() == g.number_of_edges()

        assert gpb.num_partitions() == num_parts
        gpb_meta = gpb.metadata()
        assert len(gpb_meta) == num_parts
        assert len(gpb.partid2nids(i)) == gpb_meta[i]['num_nodes']
        assert len(gpb.partid2eids(i)) == gpb_meta[i]['num_edges']
        part_sizes.append((gpb_meta[i]['num_nodes'], gpb_meta[i]['num_edges']))

        local_nid = gpb.nid2localnid(F.boolean_mask(part_g.ndata[dgl.NID], part_g.ndata['inner_node']), i)
48
        assert F.dtype(local_nid) in (F.int64, F.int32)
Da Zheng's avatar
Da Zheng committed
49
50
        assert np.all(F.asnumpy(local_nid) == np.arange(0, len(local_nid)))
        local_eid = gpb.eid2localeid(F.boolean_mask(part_g.edata[dgl.EID], part_g.edata['inner_edge']), i)
51
        assert F.dtype(local_eid) in (F.int64, F.int32)
Da Zheng's avatar
Da Zheng committed
52
        assert np.all(F.asnumpy(local_eid) == np.arange(0, len(local_eid)))
53
54

        # Check the node map.
55
56
57
        local_nodes = F.boolean_mask(part_g.ndata[dgl.NID], part_g.ndata['inner_node'])
        llocal_nodes = F.nonzero_1d(part_g.ndata['inner_node'])
        local_nodes1 = gpb.partid2nids(i)
58
        assert F.dtype(local_nodes1) in (F.int32, F.int64)
59
        assert np.all(np.sort(F.asnumpy(local_nodes)) == np.sort(F.asnumpy(local_nodes1)))
60
61

        # Check the edge map.
62
63
        local_edges = F.boolean_mask(part_g.edata[dgl.EID], part_g.edata['inner_edge'])
        local_edges1 = gpb.partid2eids(i)
64
        assert F.dtype(local_edges1) in (F.int32, F.int64)
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
        assert np.all(np.sort(F.asnumpy(local_edges)) == np.sort(F.asnumpy(local_edges1)))

        if reshuffle:
            part_g.ndata['feats'] = F.gather_row(g.ndata['feats'], part_g.ndata['orig_id'])
            part_g.edata['feats'] = F.gather_row(g.edata['feats'], part_g.edata['orig_id'])
            # when we read node data from the original global graph, we should use orig_id.
            local_nodes = F.boolean_mask(part_g.ndata['orig_id'], part_g.ndata['inner_node'])
            local_edges = F.boolean_mask(part_g.edata['orig_id'], part_g.edata['inner_edge'])
        else:
            part_g.ndata['feats'] = F.gather_row(g.ndata['feats'], part_g.ndata[dgl.NID])
            part_g.edata['feats'] = F.gather_row(g.edata['feats'], part_g.edata[dgl.NID])
        part_g.update_all(fn.copy_src('feats', 'msg'), fn.sum('msg', 'h'))
        part_g.update_all(fn.copy_edge('feats', 'msg'), fn.sum('msg', 'eh'))
        assert F.allclose(F.gather_row(g.ndata['h'], local_nodes),
                          F.gather_row(part_g.ndata['h'], llocal_nodes))
        assert F.allclose(F.gather_row(g.ndata['eh'], local_nodes),
                          F.gather_row(part_g.ndata['eh'], llocal_nodes))
82
83
84
85

        for name in ['labels', 'feats']:
            assert name in node_feats
            assert node_feats[name].shape[0] == len(local_nodes)
86
87
88
89
90
            assert np.all(F.asnumpy(g.ndata[name])[F.asnumpy(local_nodes)] == F.asnumpy(node_feats[name]))
        for name in ['feats']:
            assert name in edge_feats
            assert edge_feats[name].shape[0] == len(local_edges)
            assert np.all(F.asnumpy(g.edata[name])[F.asnumpy(local_edges)] == F.asnumpy(edge_feats[name]))
91

Da Zheng's avatar
Da Zheng committed
92
93
94
95
96
97
98
99
    if reshuffle:
        node_map = []
        edge_map = []
        for i, (num_nodes, num_edges) in enumerate(part_sizes):
            node_map.append(np.ones(num_nodes) * i)
            edge_map.append(np.ones(num_edges) * i)
        node_map = np.concatenate(node_map)
        edge_map = np.concatenate(edge_map)
100
101
102
103
104
105
        nid2pid = gpb.nid2partid(F.arange(0, len(node_map)))
        assert F.dtype(nid2pid) in (F.int32, F.int64)
        assert np.all(F.asnumpy(nid2pid) == node_map)
        eid2pid = gpb.eid2partid(F.arange(0, len(edge_map)))
        assert F.dtype(eid2pid) in (F.int32, F.int64)
        assert np.all(F.asnumpy(eid2pid) == edge_map)
Da Zheng's avatar
Da Zheng committed
106
107
108
109
110

def test_partition():
    check_partition(True)
    check_partition(False)

111
112

if __name__ == '__main__':
Da Zheng's avatar
Da Zheng committed
113
    os.makedirs('/tmp/partition', exist_ok=True)
114
    test_partition()