verify_mag_partitions.py 7.43 KB
Newer Older
1
2
3
4
5
6
7
import os
import json
import numpy as np
import dgl
import torch as th
from ogb.nodeproppred import DglNodePropPredDataset

8
9
10
partitions_folder = 'outputs'
graph_name = 'mag'
with open('{}/{}.json'.format(partitions_folder, graph_name)) as json_file:
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
    metadata = json.load(json_file)
num_parts = metadata['num_parts']

# Load OGB-MAG.
dataset = DglNodePropPredDataset(name='ogbn-mag')
hg_orig, labels = dataset[0]
subgs = {}
for etype in hg_orig.canonical_etypes:
    u, v = hg_orig.all_edges(etype=etype)
    subgs[etype] = (u, v)
    subgs[(etype[2], 'rev-'+etype[1], etype[0])] = (v, u)
hg = dgl.heterograph(subgs)
hg.nodes['paper'].data['feat'] = hg_orig.nodes['paper'].data['feat']

# Construct node data and edge data after reshuffling.
node_feats = {}
edge_feats = {}
for partid in range(num_parts):
29
30
31
32
    part_node_feats = dgl.data.utils.load_tensors(
        '{}/part{}/node_feat.dgl'.format(partitions_folder, partid))
    part_edge_feats = dgl.data.utils.load_tensors(
        '{}/part{}/edge_feat.dgl'.format(partitions_folder, partid))
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
    for key in part_node_feats:
        if key in node_feats:
            node_feats[key].append(part_node_feats[key])
        else:
            node_feats[key] = [part_node_feats[key]]
    for key in part_edge_feats:
        if key in edge_feats:
            edge_feats[key].append(part_edge_feats[key])
        else:
            edge_feats[key] = [part_edge_feats[key]]
for key in node_feats:
    node_feats[key] = th.cat(node_feats[key])
for key in edge_feats:
    edge_feats[key] = th.cat(edge_feats[key])

ntype_map = metadata['ntypes']
ntypes = [None] * len(ntype_map)
for key in ntype_map:
    ntype_id = ntype_map[key]
    ntypes[ntype_id] = key
etype_map = metadata['etypes']
etypes = [None] * len(etype_map)
for key in etype_map:
    etype_id = etype_map[key]
    etypes[etype_id] = key

59
60
61
etype2canonical = {etype: (srctype, etype, dsttype)
                   for srctype, etype, dsttype in hg.canonical_etypes}

62
63
64
65
66
67
68
69
70
node_map = metadata['node_map']
for key in node_map:
    node_map[key] = th.stack([th.tensor(row) for row in node_map[key]], 0)
nid_map = dgl.distributed.id_map.IdMap(node_map)
edge_map = metadata['edge_map']
for key in edge_map:
    edge_map[key] = th.stack([th.tensor(row) for row in edge_map[key]], 0)
eid_map = dgl.distributed.id_map.IdMap(edge_map)

71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
for ntype in node_map:
    assert hg.number_of_nodes(ntype) == th.sum(
        node_map[ntype][:, 1] - node_map[ntype][:, 0])
for etype in edge_map:
    assert hg.number_of_edges(etype) == th.sum(
        edge_map[etype][:, 1] - edge_map[etype][:, 0])

# verify part_0 with graph_partition_book
eid = []
gpb = dgl.distributed.graph_partition_book.RangePartitionBook(0, num_parts, node_map, edge_map,
                                                              {ntype: i for i, ntype in enumerate(
                                                                  hg.ntypes)},
                                                              {etype: i for i, etype in enumerate(hg.etypes)})
subg0 = dgl.load_graphs('{}/part0/graph.dgl'.format(partitions_folder))[0][0]
for etype in hg.etypes:
    type_eid = th.zeros((1,), dtype=th.int64)
    eid.append(gpb.map_to_homo_eid(type_eid, etype))
eid = th.cat(eid)
part_id = gpb.eid2partid(eid)
assert th.all(part_id == 0)
local_eid = gpb.eid2localeid(eid, 0)
assert th.all(local_eid == eid)
assert th.all(subg0.edata[dgl.EID][local_eid] == eid)
lsrc, ldst = subg0.find_edges(local_eid)
gsrc, gdst = subg0.ndata[dgl.NID][lsrc], subg0.ndata[dgl.NID][ldst]
assert th.all(gsrc == lsrc)
# gdst which is not assigned into current partition is not required to equal ldst
assert th.all(th.logical_or(
    gdst == ldst, subg0.ndata['inner_node'][ldst] == 0))
etids, _ = gpb.map_to_per_etype(eid)
src_tids, _ = gpb.map_to_per_ntype(gsrc)
dst_tids, _ = gpb.map_to_per_ntype(gdst)
canonical_etypes = []
etype_ids = th.arange(0, len(etypes))
for src_tid, etype_id, dst_tid in zip(src_tids, etype_ids, dst_tids):
    canonical_etypes.append(
        (ntypes[src_tid], etypes[etype_id], ntypes[dst_tid]))
for etype in canonical_etypes:
    assert etype in hg.canonical_etypes

111
# Load the graph partition structure.
112
113
orig_node_ids = {ntype: [] for ntype in hg.ntypes}
orig_edge_ids = {etype: [] for etype in hg.etypes}
114
115
for partid in range(num_parts):
    print('test part', partid)
116
    part_file = '{}/part{}/graph.dgl'.format(partitions_folder, partid)
117
118
    subg = dgl.load_graphs(part_file)[0][0]
    subg_src_id, subg_dst_id = subg.edges()
119
120
121
122
    orig_src_id = subg.ndata['orig_id'][subg_src_id]
    orig_dst_id = subg.ndata['orig_id'][subg_dst_id]
    global_src_id = subg.ndata[dgl.NID][subg_src_id]
    global_dst_id = subg.ndata[dgl.NID][subg_dst_id]
123
124
125
126
127
128
129
130
131
    subg_ntype = subg.ndata[dgl.NTYPE]
    subg_etype = subg.edata[dgl.ETYPE]
    for ntype_id in th.unique(subg_ntype):
        ntype = ntypes[ntype_id]
        idx = subg_ntype == ntype_id
        # This is global IDs after reshuffle.
        nid = subg.ndata[dgl.NID][idx]
        ntype_ids1, type_nid = nid_map(nid)
        orig_type_nid = subg.ndata['orig_id'][idx]
132
        inner_node = subg.ndata['inner_node'][idx]
133
134
        # All nodes should have the same node type.
        assert np.all(ntype_ids1.numpy() == int(ntype_id))
135
136
137
        assert np.all(nid[inner_node == 1].numpy() == np.arange(
            node_map[ntype][partid, 0], node_map[ntype][partid, 1]))
        orig_node_ids[ntype].append(orig_type_nid[inner_node == 1])
138
139
140
141
142
143
144
145
146

        # Check node data.
        for name in hg.nodes[ntype].data:
            local_data = node_feats[ntype + '/' + name][type_nid]
            local_data1 = hg.nodes[ntype].data[name][orig_type_nid]
            assert np.all(local_data.numpy() == local_data1.numpy())

    for etype_id in th.unique(subg_etype):
        etype = etypes[etype_id]
147
        srctype, _, dsttype = etype2canonical[etype]
148
        idx = subg_etype == etype_id
149
        exist = hg[etype].has_edges_between(orig_src_id[idx], orig_dst_id[idx])
150
        assert np.all(exist.numpy())
151
        eid = hg[etype].edge_ids(orig_src_id[idx], orig_dst_id[idx])
152
153
        assert np.all(eid.numpy() == subg.edata['orig_id'][idx].numpy())

154
155
156
157
158
159
160
        ntype_ids, type_nid = nid_map(global_src_id[idx])
        assert len(th.unique(ntype_ids)) == 1
        assert ntypes[ntype_ids[0]] == srctype
        ntype_ids, type_nid = nid_map(global_dst_id[idx])
        assert len(th.unique(ntype_ids)) == 1
        assert ntypes[ntype_ids[0]] == dsttype

161
162
163
164
        # This is global IDs after reshuffle.
        eid = subg.edata[dgl.EID][idx]
        etype_ids1, type_eid = eid_map(eid)
        orig_type_eid = subg.edata['orig_id'][idx]
165
        inner_edge = subg.edata['inner_edge'][idx]
166
167
        # All edges should have the same edge type.
        assert np.all(etype_ids1.numpy() == int(etype_id))
168
169
170
        assert np.all(np.sort(eid[inner_edge == 1].numpy()) == np.arange(
            edge_map[etype][partid, 0], edge_map[etype][partid, 1]))
        orig_edge_ids[etype].append(orig_type_eid[inner_edge == 1])
171
172
173
174
175
176

        # Check edge data.
        for name in hg.edges[etype].data:
            local_data = edge_feats[etype + '/' + name][type_eid]
            local_data1 = hg.edges[etype].data[name][orig_type_eid]
            assert np.all(local_data.numpy() == local_data1.numpy())
177
178
179
180
181
182
183
184
185
186

for ntype in orig_node_ids:
    nids = th.cat(orig_node_ids[ntype])
    nids = th.sort(nids)[0]
    assert np.all((nids == th.arange(hg.number_of_nodes(ntype))).numpy())

for etype in orig_edge_ids:
    eids = th.cat(orig_edge_ids[etype])
    eids = th.sort(eids)[0]
    assert np.all((eids == th.arange(hg.number_of_edges(etype))).numpy())