verify_mag_partitions.py 8.13 KB
Newer Older
1
2
3
4
5
6
7
import os
import json
import numpy as np
import dgl
import torch as th
from ogb.nodeproppred import DglNodePropPredDataset

8
9
10
partitions_folder = 'outputs'
graph_name = 'mag'
with open('{}/{}.json'.format(partitions_folder, graph_name)) as json_file:
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
    metadata = json.load(json_file)
num_parts = metadata['num_parts']

# Load OGB-MAG.
dataset = DglNodePropPredDataset(name='ogbn-mag')
hg_orig, labels = dataset[0]
subgs = {}
for etype in hg_orig.canonical_etypes:
    u, v = hg_orig.all_edges(etype=etype)
    subgs[etype] = (u, v)
    subgs[(etype[2], 'rev-'+etype[1], etype[0])] = (v, u)
hg = dgl.heterograph(subgs)
hg.nodes['paper'].data['feat'] = hg_orig.nodes['paper'].data['feat']

# Construct node data and edge data after reshuffling.
node_feats = {}
edge_feats = {}
for partid in range(num_parts):
29
30
31
32
    part_node_feats = dgl.data.utils.load_tensors(
        '{}/part{}/node_feat.dgl'.format(partitions_folder, partid))
    part_edge_feats = dgl.data.utils.load_tensors(
        '{}/part{}/edge_feat.dgl'.format(partitions_folder, partid))
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
    for key in part_node_feats:
        if key in node_feats:
            node_feats[key].append(part_node_feats[key])
        else:
            node_feats[key] = [part_node_feats[key]]
    for key in part_edge_feats:
        if key in edge_feats:
            edge_feats[key].append(part_edge_feats[key])
        else:
            edge_feats[key] = [part_edge_feats[key]]
for key in node_feats:
    node_feats[key] = th.cat(node_feats[key])
for key in edge_feats:
    edge_feats[key] = th.cat(edge_feats[key])

ntype_map = metadata['ntypes']
ntypes = [None] * len(ntype_map)
for key in ntype_map:
    ntype_id = ntype_map[key]
    ntypes[ntype_id] = key
etype_map = metadata['etypes']
etypes = [None] * len(etype_map)
for key in etype_map:
    etype_id = etype_map[key]
    etypes[etype_id] = key

59
60
61
etype2canonical = {etype: (srctype, etype, dsttype)
                   for srctype, etype, dsttype in hg.canonical_etypes}

62
63
64
65
66
67
68
69
70
node_map = metadata['node_map']
for key in node_map:
    node_map[key] = th.stack([th.tensor(row) for row in node_map[key]], 0)
nid_map = dgl.distributed.id_map.IdMap(node_map)
edge_map = metadata['edge_map']
for key in edge_map:
    edge_map[key] = th.stack([th.tensor(row) for row in edge_map[key]], 0)
eid_map = dgl.distributed.id_map.IdMap(edge_map)

71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
for ntype in node_map:
    assert hg.number_of_nodes(ntype) == th.sum(
        node_map[ntype][:, 1] - node_map[ntype][:, 0])
for etype in edge_map:
    assert hg.number_of_edges(etype) == th.sum(
        edge_map[etype][:, 1] - edge_map[etype][:, 0])

# verify part_0 with graph_partition_book
eid = []
gpb = dgl.distributed.graph_partition_book.RangePartitionBook(0, num_parts, node_map, edge_map,
                                                              {ntype: i for i, ntype in enumerate(
                                                                  hg.ntypes)},
                                                              {etype: i for i, etype in enumerate(hg.etypes)})
subg0 = dgl.load_graphs('{}/part0/graph.dgl'.format(partitions_folder))[0][0]
for etype in hg.etypes:
    type_eid = th.zeros((1,), dtype=th.int64)
    eid.append(gpb.map_to_homo_eid(type_eid, etype))
eid = th.cat(eid)
part_id = gpb.eid2partid(eid)
assert th.all(part_id == 0)
local_eid = gpb.eid2localeid(eid, 0)
assert th.all(local_eid == eid)
assert th.all(subg0.edata[dgl.EID][local_eid] == eid)
lsrc, ldst = subg0.find_edges(local_eid)
gsrc, gdst = subg0.ndata[dgl.NID][lsrc], subg0.ndata[dgl.NID][ldst]
Da Zheng's avatar
Da Zheng committed
96
97
# The destination nodes are owned by the partition.
assert th.all(gdst == ldst)
98
99
100
101
102
103
104
105
106
107
108
109
110
111
# gdst which is not assigned into current partition is not required to equal ldst
assert th.all(th.logical_or(
    gdst == ldst, subg0.ndata['inner_node'][ldst] == 0))
etids, _ = gpb.map_to_per_etype(eid)
src_tids, _ = gpb.map_to_per_ntype(gsrc)
dst_tids, _ = gpb.map_to_per_ntype(gdst)
canonical_etypes = []
etype_ids = th.arange(0, len(etypes))
for src_tid, etype_id, dst_tid in zip(src_tids, etype_ids, dst_tids):
    canonical_etypes.append(
        (ntypes[src_tid], etypes[etype_id], ntypes[dst_tid]))
for etype in canonical_etypes:
    assert etype in hg.canonical_etypes

112
# Load the graph partition structure.
113
114
orig_node_ids = {ntype: [] for ntype in hg.ntypes}
orig_edge_ids = {etype: [] for etype in hg.etypes}
115
116
for partid in range(num_parts):
    print('test part', partid)
117
    part_file = '{}/part{}/graph.dgl'.format(partitions_folder, partid)
118
119
    subg = dgl.load_graphs(part_file)[0][0]
    subg_src_id, subg_dst_id = subg.edges()
120
121
122
123
    orig_src_id = subg.ndata['orig_id'][subg_src_id]
    orig_dst_id = subg.ndata['orig_id'][subg_dst_id]
    global_src_id = subg.ndata[dgl.NID][subg_src_id]
    global_dst_id = subg.ndata[dgl.NID][subg_dst_id]
124
125
126
127
128
129
130
131
132
    subg_ntype = subg.ndata[dgl.NTYPE]
    subg_etype = subg.edata[dgl.ETYPE]
    for ntype_id in th.unique(subg_ntype):
        ntype = ntypes[ntype_id]
        idx = subg_ntype == ntype_id
        # This is global IDs after reshuffle.
        nid = subg.ndata[dgl.NID][idx]
        ntype_ids1, type_nid = nid_map(nid)
        orig_type_nid = subg.ndata['orig_id'][idx]
133
        inner_node = subg.ndata['inner_node'][idx]
134
135
        # All nodes should have the same node type.
        assert np.all(ntype_ids1.numpy() == int(ntype_id))
136
137
138
        assert np.all(nid[inner_node == 1].numpy() == np.arange(
            node_map[ntype][partid, 0], node_map[ntype][partid, 1]))
        orig_node_ids[ntype].append(orig_type_nid[inner_node == 1])
139

Da Zheng's avatar
Da Zheng committed
140
141
142
143
144
145
146
147
148
149
150
151
152
        # Check the degree of the inner nodes.
        inner_nids = th.nonzero(th.logical_and(subg_ntype == ntype_id, subg.ndata['inner_node']),
                                as_tuple=True)[0]
        subg_deg = subg.in_degrees(inner_nids)
        orig_nids = subg.ndata['orig_id'][inner_nids]
        # Calculate the in-degrees of nodes of a particular node type.
        glob_deg = th.zeros(len(subg_deg), dtype=th.int64)
        for etype in hg.canonical_etypes:
            dst_ntype = etype[2]
            if dst_ntype == ntype:
                glob_deg += hg.in_degrees(orig_nids, etype=etype)
        assert np.all(glob_deg.numpy() == subg_deg.numpy())

153
154
155
156
157
158
159
160
        # Check node data.
        for name in hg.nodes[ntype].data:
            local_data = node_feats[ntype + '/' + name][type_nid]
            local_data1 = hg.nodes[ntype].data[name][orig_type_nid]
            assert np.all(local_data.numpy() == local_data1.numpy())

    for etype_id in th.unique(subg_etype):
        etype = etypes[etype_id]
161
        srctype, _, dsttype = etype2canonical[etype]
162
        idx = subg_etype == etype_id
163
        exist = hg[etype].has_edges_between(orig_src_id[idx], orig_dst_id[idx])
164
        assert np.all(exist.numpy())
165
        eid = hg[etype].edge_ids(orig_src_id[idx], orig_dst_id[idx])
166
167
        assert np.all(eid.numpy() == subg.edata['orig_id'][idx].numpy())

168
169
170
171
172
173
174
        ntype_ids, type_nid = nid_map(global_src_id[idx])
        assert len(th.unique(ntype_ids)) == 1
        assert ntypes[ntype_ids[0]] == srctype
        ntype_ids, type_nid = nid_map(global_dst_id[idx])
        assert len(th.unique(ntype_ids)) == 1
        assert ntypes[ntype_ids[0]] == dsttype

175
176
177
178
        # This is global IDs after reshuffle.
        eid = subg.edata[dgl.EID][idx]
        etype_ids1, type_eid = eid_map(eid)
        orig_type_eid = subg.edata['orig_id'][idx]
179
        inner_edge = subg.edata['inner_edge'][idx]
180
181
        # All edges should have the same edge type.
        assert np.all(etype_ids1.numpy() == int(etype_id))
182
183
184
        assert np.all(np.sort(eid[inner_edge == 1].numpy()) == np.arange(
            edge_map[etype][partid, 0], edge_map[etype][partid, 1]))
        orig_edge_ids[etype].append(orig_type_eid[inner_edge == 1])
185
186
187
188
189
190

        # Check edge data.
        for name in hg.edges[etype].data:
            local_data = edge_feats[etype + '/' + name][type_eid]
            local_data1 = hg.edges[etype].data[name][orig_type_eid]
            assert np.all(local_data.numpy() == local_data1.numpy())
191
192
193
194
195
196
197
198
199
200

for ntype in orig_node_ids:
    nids = th.cat(orig_node_ids[ntype])
    nids = th.sort(nids)[0]
    assert np.all((nids == th.arange(hg.number_of_nodes(ntype))).numpy())

for etype in orig_edge_ids:
    eids = th.cat(orig_edge_ids[etype])
    eids = th.sort(eids)[0]
    assert np.all((eids == th.arange(hg.number_of_edges(etype))).numpy())