"tests/test_datasets/test_waymo_dataset.py" did not exist on "6f1a268e1be715a700121921009a2e7d7f030704"
verify_mag_partitions.py 8.03 KB
Newer Older
1
import json
Hongzhi (Steve), Chen's avatar
Hongzhi (Steve), Chen committed
2
3
import os

4
import dgl
Hongzhi (Steve), Chen's avatar
Hongzhi (Steve), Chen committed
5
import numpy as np
6
7
8
import torch as th
from ogb.nodeproppred import DglNodePropPredDataset

Hongzhi (Steve), Chen's avatar
Hongzhi (Steve), Chen committed
9
10
11
partitions_folder = "outputs"
graph_name = "mag"
with open("{}/{}.json".format(partitions_folder, graph_name)) as json_file:
12
    metadata = json.load(json_file)
Hongzhi (Steve), Chen's avatar
Hongzhi (Steve), Chen committed
13
num_parts = metadata["num_parts"]
14
15

# Load OGB-MAG.
Hongzhi (Steve), Chen's avatar
Hongzhi (Steve), Chen committed
16
dataset = DglNodePropPredDataset(name="ogbn-mag")
17
18
19
20
21
hg_orig, labels = dataset[0]
subgs = {}
for etype in hg_orig.canonical_etypes:
    u, v = hg_orig.all_edges(etype=etype)
    subgs[etype] = (u, v)
Hongzhi (Steve), Chen's avatar
Hongzhi (Steve), Chen committed
22
    subgs[(etype[2], "rev-" + etype[1], etype[0])] = (v, u)
23
hg = dgl.heterograph(subgs)
Hongzhi (Steve), Chen's avatar
Hongzhi (Steve), Chen committed
24
hg.nodes["paper"].data["feat"] = hg_orig.nodes["paper"].data["feat"]
25
26
27
28
29

# Construct node data and edge data after reshuffling.
node_feats = {}
edge_feats = {}
for partid in range(num_parts):
30
    part_node_feats = dgl.data.utils.load_tensors(
Hongzhi (Steve), Chen's avatar
Hongzhi (Steve), Chen committed
31
32
        "{}/part{}/node_feat.dgl".format(partitions_folder, partid)
    )
33
    part_edge_feats = dgl.data.utils.load_tensors(
Hongzhi (Steve), Chen's avatar
Hongzhi (Steve), Chen committed
34
35
        "{}/part{}/edge_feat.dgl".format(partitions_folder, partid)
    )
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
    for key in part_node_feats:
        if key in node_feats:
            node_feats[key].append(part_node_feats[key])
        else:
            node_feats[key] = [part_node_feats[key]]
    for key in part_edge_feats:
        if key in edge_feats:
            edge_feats[key].append(part_edge_feats[key])
        else:
            edge_feats[key] = [part_edge_feats[key]]
for key in node_feats:
    node_feats[key] = th.cat(node_feats[key])
for key in edge_feats:
    edge_feats[key] = th.cat(edge_feats[key])

Hongzhi (Steve), Chen's avatar
Hongzhi (Steve), Chen committed
51
ntype_map = metadata["ntypes"]
52
53
54
55
ntypes = [None] * len(ntype_map)
for key in ntype_map:
    ntype_id = ntype_map[key]
    ntypes[ntype_id] = key
Hongzhi (Steve), Chen's avatar
Hongzhi (Steve), Chen committed
56
etype_map = metadata["etypes"]
57
58
59
60
61
etypes = [None] * len(etype_map)
for key in etype_map:
    etype_id = etype_map[key]
    etypes[etype_id] = key

Hongzhi (Steve), Chen's avatar
Hongzhi (Steve), Chen committed
62
63
64
65
etype2canonical = {
    etype: (srctype, etype, dsttype)
    for srctype, etype, dsttype in hg.canonical_etypes
}
66

Hongzhi (Steve), Chen's avatar
Hongzhi (Steve), Chen committed
67
node_map = metadata["node_map"]
68
69
70
for key in node_map:
    node_map[key] = th.stack([th.tensor(row) for row in node_map[key]], 0)
nid_map = dgl.distributed.id_map.IdMap(node_map)
Hongzhi (Steve), Chen's avatar
Hongzhi (Steve), Chen committed
71
edge_map = metadata["edge_map"]
72
73
74
75
for key in edge_map:
    edge_map[key] = th.stack([th.tensor(row) for row in edge_map[key]], 0)
eid_map = dgl.distributed.id_map.IdMap(edge_map)

76
77
for ntype in node_map:
    assert hg.number_of_nodes(ntype) == th.sum(
Hongzhi (Steve), Chen's avatar
Hongzhi (Steve), Chen committed
78
79
        node_map[ntype][:, 1] - node_map[ntype][:, 0]
    )
80
81
for etype in edge_map:
    assert hg.number_of_edges(etype) == th.sum(
Hongzhi (Steve), Chen's avatar
Hongzhi (Steve), Chen committed
82
83
        edge_map[etype][:, 1] - edge_map[etype][:, 0]
    )
84
85
86

# verify part_0 with graph_partition_book
eid = []
Hongzhi (Steve), Chen's avatar
Hongzhi (Steve), Chen committed
87
88
89
90
91
92
93
94
95
gpb = dgl.distributed.graph_partition_book.RangePartitionBook(
    0,
    num_parts,
    node_map,
    edge_map,
    {ntype: i for i, ntype in enumerate(hg.ntypes)},
    {etype: i for i, etype in enumerate(hg.etypes)},
)
subg0 = dgl.load_graphs("{}/part0/graph.dgl".format(partitions_folder))[0][0]
96
97
98
99
100
101
102
103
104
105
106
for etype in hg.etypes:
    type_eid = th.zeros((1,), dtype=th.int64)
    eid.append(gpb.map_to_homo_eid(type_eid, etype))
eid = th.cat(eid)
part_id = gpb.eid2partid(eid)
assert th.all(part_id == 0)
local_eid = gpb.eid2localeid(eid, 0)
assert th.all(local_eid == eid)
assert th.all(subg0.edata[dgl.EID][local_eid] == eid)
lsrc, ldst = subg0.find_edges(local_eid)
gsrc, gdst = subg0.ndata[dgl.NID][lsrc], subg0.ndata[dgl.NID][ldst]
Da Zheng's avatar
Da Zheng committed
107
108
# The destination nodes are owned by the partition.
assert th.all(gdst == ldst)
109
# gdst which is not assigned into current partition is not required to equal ldst
Hongzhi (Steve), Chen's avatar
Hongzhi (Steve), Chen committed
110
assert th.all(th.logical_or(gdst == ldst, subg0.ndata["inner_node"][ldst] == 0))
111
112
113
114
115
116
117
etids, _ = gpb.map_to_per_etype(eid)
src_tids, _ = gpb.map_to_per_ntype(gsrc)
dst_tids, _ = gpb.map_to_per_ntype(gdst)
canonical_etypes = []
etype_ids = th.arange(0, len(etypes))
for src_tid, etype_id, dst_tid in zip(src_tids, etype_ids, dst_tids):
    canonical_etypes.append(
Hongzhi (Steve), Chen's avatar
Hongzhi (Steve), Chen committed
118
119
        (ntypes[src_tid], etypes[etype_id], ntypes[dst_tid])
    )
120
121
122
for etype in canonical_etypes:
    assert etype in hg.canonical_etypes

123
# Load the graph partition structure.
124
125
orig_node_ids = {ntype: [] for ntype in hg.ntypes}
orig_edge_ids = {etype: [] for etype in hg.etypes}
126
for partid in range(num_parts):
Hongzhi (Steve), Chen's avatar
Hongzhi (Steve), Chen committed
127
128
    print("test part", partid)
    part_file = "{}/part{}/graph.dgl".format(partitions_folder, partid)
129
130
    subg = dgl.load_graphs(part_file)[0][0]
    subg_src_id, subg_dst_id = subg.edges()
Hongzhi (Steve), Chen's avatar
Hongzhi (Steve), Chen committed
131
132
    orig_src_id = subg.ndata["orig_id"][subg_src_id]
    orig_dst_id = subg.ndata["orig_id"][subg_dst_id]
133
134
    global_src_id = subg.ndata[dgl.NID][subg_src_id]
    global_dst_id = subg.ndata[dgl.NID][subg_dst_id]
135
136
137
138
139
140
141
142
    subg_ntype = subg.ndata[dgl.NTYPE]
    subg_etype = subg.edata[dgl.ETYPE]
    for ntype_id in th.unique(subg_ntype):
        ntype = ntypes[ntype_id]
        idx = subg_ntype == ntype_id
        # This is global IDs after reshuffle.
        nid = subg.ndata[dgl.NID][idx]
        ntype_ids1, type_nid = nid_map(nid)
Hongzhi (Steve), Chen's avatar
Hongzhi (Steve), Chen committed
143
144
        orig_type_nid = subg.ndata["orig_id"][idx]
        inner_node = subg.ndata["inner_node"][idx]
145
146
        # All nodes should have the same node type.
        assert np.all(ntype_ids1.numpy() == int(ntype_id))
Hongzhi (Steve), Chen's avatar
Hongzhi (Steve), Chen committed
147
148
149
150
        assert np.all(
            nid[inner_node == 1].numpy()
            == np.arange(node_map[ntype][partid, 0], node_map[ntype][partid, 1])
        )
151
        orig_node_ids[ntype].append(orig_type_nid[inner_node == 1])
152

Da Zheng's avatar
Da Zheng committed
153
        # Check the degree of the inner nodes.
Hongzhi (Steve), Chen's avatar
Hongzhi (Steve), Chen committed
154
155
156
157
        inner_nids = th.nonzero(
            th.logical_and(subg_ntype == ntype_id, subg.ndata["inner_node"]),
            as_tuple=True,
        )[0]
Da Zheng's avatar
Da Zheng committed
158
        subg_deg = subg.in_degrees(inner_nids)
Hongzhi (Steve), Chen's avatar
Hongzhi (Steve), Chen committed
159
        orig_nids = subg.ndata["orig_id"][inner_nids]
Da Zheng's avatar
Da Zheng committed
160
161
162
163
164
165
166
167
        # Calculate the in-degrees of nodes of a particular node type.
        glob_deg = th.zeros(len(subg_deg), dtype=th.int64)
        for etype in hg.canonical_etypes:
            dst_ntype = etype[2]
            if dst_ntype == ntype:
                glob_deg += hg.in_degrees(orig_nids, etype=etype)
        assert np.all(glob_deg.numpy() == subg_deg.numpy())

168
169
        # Check node data.
        for name in hg.nodes[ntype].data:
Hongzhi (Steve), Chen's avatar
Hongzhi (Steve), Chen committed
170
            local_data = node_feats[ntype + "/" + name][type_nid]
171
172
173
174
175
            local_data1 = hg.nodes[ntype].data[name][orig_type_nid]
            assert np.all(local_data.numpy() == local_data1.numpy())

    for etype_id in th.unique(subg_etype):
        etype = etypes[etype_id]
176
        srctype, _, dsttype = etype2canonical[etype]
177
        idx = subg_etype == etype_id
178
        exist = hg[etype].has_edges_between(orig_src_id[idx], orig_dst_id[idx])
179
        assert np.all(exist.numpy())
180
        eid = hg[etype].edge_ids(orig_src_id[idx], orig_dst_id[idx])
Hongzhi (Steve), Chen's avatar
Hongzhi (Steve), Chen committed
181
        assert np.all(eid.numpy() == subg.edata["orig_id"][idx].numpy())
182

183
184
185
186
187
188
189
        ntype_ids, type_nid = nid_map(global_src_id[idx])
        assert len(th.unique(ntype_ids)) == 1
        assert ntypes[ntype_ids[0]] == srctype
        ntype_ids, type_nid = nid_map(global_dst_id[idx])
        assert len(th.unique(ntype_ids)) == 1
        assert ntypes[ntype_ids[0]] == dsttype

190
191
192
        # This is global IDs after reshuffle.
        eid = subg.edata[dgl.EID][idx]
        etype_ids1, type_eid = eid_map(eid)
Hongzhi (Steve), Chen's avatar
Hongzhi (Steve), Chen committed
193
194
        orig_type_eid = subg.edata["orig_id"][idx]
        inner_edge = subg.edata["inner_edge"][idx]
195
196
        # All edges should have the same edge type.
        assert np.all(etype_ids1.numpy() == int(etype_id))
Hongzhi (Steve), Chen's avatar
Hongzhi (Steve), Chen committed
197
198
199
200
        assert np.all(
            np.sort(eid[inner_edge == 1].numpy())
            == np.arange(edge_map[etype][partid, 0], edge_map[etype][partid, 1])
        )
201
        orig_edge_ids[etype].append(orig_type_eid[inner_edge == 1])
202
203
204

        # Check edge data.
        for name in hg.edges[etype].data:
Hongzhi (Steve), Chen's avatar
Hongzhi (Steve), Chen committed
205
            local_data = edge_feats[etype + "/" + name][type_eid]
206
207
            local_data1 = hg.edges[etype].data[name][orig_type_eid]
            assert np.all(local_data.numpy() == local_data1.numpy())
208
209
210
211
212
213
214
215
216
217

for ntype in orig_node_ids:
    nids = th.cat(orig_node_ids[ntype])
    nids = th.sort(nids)[0]
    assert np.all((nids == th.arange(hg.number_of_nodes(ntype))).numpy())

for etype in orig_edge_ids:
    eids = th.cat(orig_edge_ids[etype])
    eids = th.sort(eids)[0]
    assert np.all((eids == th.arange(hg.number_of_edges(etype))).numpy())