test_partition.py 28 KB
Newer Older
1
import json
2
import os
3
import tempfile
4
5
6

import backend as F
import dgl
7
import numpy as np
8
import pytest
9
import torch as th
10
from dgl import function as fn
11
from dgl.distributed import (
12
    convert_dgl_partition_to_csc_sampling_graph,
13
    load_partition,
14
    load_partition_book,
15
16
17
18
    load_partition_feats,
    partition_graph,
)
from dgl.distributed.graph_partition_book import (
19
    _etype_tuple_to_str,
20
21
22
23
24
25
26
    DEFAULT_ETYPE,
    DEFAULT_NTYPE,
    EdgePartitionPolicy,
    HeteroDataName,
    NodePartitionPolicy,
    RangePartitionBook,
)
27
from dgl.distributed.partition import (
28
    _get_inner_edge_mask,
29
    _get_inner_node_mask,
30
    RESERVED_FIELD_DTYPE,
31
32
)
from scipy import sparse as spsp
33
from utils import reset_envs
34

35
36
37
38
39
40
41

def _verify_partition_data_types(part_g):
    for k, dtype in RESERVED_FIELD_DTYPE.items():
        if k in part_g.ndata:
            assert part_g.ndata[k].dtype == dtype
        if k in part_g.edata:
            assert part_g.edata[k].dtype == dtype
42

43

44
45
46
47
def _verify_partition_formats(part_g, formats):
    # verify saved graph formats
    if formats is None:
        assert "coo" in part_g.formats()["created"]
48
    else:
49
50
        for format in formats:
            assert format in part_g.formats()["created"]
51
52


53
def create_random_graph(n):
54
55
56
    arr = (
        spsp.random(n, n, density=0.001, format="coo", random_state=100) != 0
    ).astype(np.int64)
57
    return dgl.from_scipy(arr)
58

59

60
def create_random_hetero():
61
    num_nodes = {"n1": 1000, "n2": 1010, "n3": 1020}
62
63
64
65
66
67
    etypes = [
        ("n1", "r1", "n2"),
        ("n2", "r1", "n1"),
        ("n1", "r2", "n3"),
        ("n2", "r3", "n3"),
    ]
68
69
70
    edges = {}
    for etype in etypes:
        src_ntype, _, dst_ntype = etype
71
72
73
74
75
76
77
        arr = spsp.random(
            num_nodes[src_ntype],
            num_nodes[dst_ntype],
            density=0.001,
            format="coo",
            random_state=100,
        )
78
79
80
        edges[etype] = (arr.row, arr.col)
    return dgl.heterograph(edges, num_nodes)

81

82
def verify_hetero_graph(g, parts):
83
    num_nodes = {ntype: 0 for ntype in g.ntypes}
84
    num_edges = {etype: 0 for etype in g.canonical_etypes}
85
86
    for part in parts:
        assert len(g.ntypes) == len(F.unique(part.ndata[dgl.NTYPE]))
87
        assert len(g.canonical_etypes) == len(F.unique(part.edata[dgl.ETYPE]))
88
89
90
91
92
        for ntype in g.ntypes:
            ntype_id = g.get_ntype_id(ntype)
            inner_node_mask = _get_inner_node_mask(part, ntype_id)
            num_inner_nodes = F.sum(F.astype(inner_node_mask, F.int64), 0)
            num_nodes[ntype] += num_inner_nodes
93
        for etype in g.canonical_etypes:
94
95
96
97
98
99
            etype_id = g.get_etype_id(etype)
            inner_edge_mask = _get_inner_edge_mask(part, etype_id)
            num_inner_edges = F.sum(F.astype(inner_edge_mask, F.int64), 0)
            num_edges[etype] += num_inner_edges
    # Verify the number of nodes are correct.
    for ntype in g.ntypes:
100
101
        print(
            "node {}: {}, {}".format(
Hongzhi (Steve), Chen's avatar
Hongzhi (Steve), Chen committed
102
                ntype, g.num_nodes(ntype), num_nodes[ntype]
103
104
            )
        )
Hongzhi (Steve), Chen's avatar
Hongzhi (Steve), Chen committed
105
        assert g.num_nodes(ntype) == num_nodes[ntype]
106
    # Verify the number of edges are correct.
107
    for etype in g.canonical_etypes:
108
109
        print(
            "edge {}: {}, {}".format(
Hongzhi (Steve), Chen's avatar
Hongzhi (Steve), Chen committed
110
                etype, g.num_edges(etype), num_edges[etype]
111
112
            )
        )
Hongzhi (Steve), Chen's avatar
Hongzhi (Steve), Chen committed
113
        assert g.num_edges(etype) == num_edges[etype]
114

115
    nids = {ntype: [] for ntype in g.ntypes}
116
    eids = {etype: [] for etype in g.canonical_etypes}
117
    for part in parts:
118
        _, _, eid = part.edges(form="all")
119
120
        etype_arr = F.gather_row(part.edata[dgl.ETYPE], eid)
        eid_type = F.gather_row(part.edata[dgl.EID], eid)
121
        for etype in g.canonical_etypes:
122
123
124
125
            etype_id = g.get_etype_id(etype)
            eids[etype].append(F.boolean_mask(eid_type, etype_arr == etype_id))
            # Make sure edge Ids fall into a range.
            inner_edge_mask = _get_inner_edge_mask(part, etype_id)
126
127
128
129
130
131
            inner_eids = np.sort(
                F.asnumpy(F.boolean_mask(part.edata[dgl.EID], inner_edge_mask))
            )
            assert np.all(
                inner_eids == np.arange(inner_eids[0], inner_eids[-1] + 1)
            )
132
133
134
135
136
137

        for ntype in g.ntypes:
            ntype_id = g.get_ntype_id(ntype)
            # Make sure inner nodes have Ids fall into a range.
            inner_node_mask = _get_inner_node_mask(part, ntype_id)
            inner_nids = F.boolean_mask(part.ndata[dgl.NID], inner_node_mask)
138
139
140
141
142
143
144
145
146
            assert np.all(
                F.asnumpy(
                    inner_nids
                    == F.arange(
                        F.as_scalar(inner_nids[0]),
                        F.as_scalar(inner_nids[-1]) + 1,
                    )
                )
            )
147
148
149
150
151
152
            nids[ntype].append(inner_nids)

    for ntype in nids:
        nids_type = F.cat(nids[ntype], 0)
        uniq_ids = F.unique(nids_type)
        # We should get all nodes.
Hongzhi (Steve), Chen's avatar
Hongzhi (Steve), Chen committed
153
        assert len(uniq_ids) == g.num_nodes(ntype)
154
155
156
    for etype in eids:
        eids_type = F.cat(eids[etype], 0)
        uniq_ids = F.unique(eids_type)
Hongzhi (Steve), Chen's avatar
Hongzhi (Steve), Chen committed
157
        assert len(uniq_ids) == g.num_edges(etype)
158
159
    # TODO(zhengda) this doesn't check 'part_id'

160
161
162
163

def verify_graph_feats(
    g, gpb, part, node_feats, edge_feats, orig_nids, orig_eids
):
164
165
    for ntype in g.ntypes:
        ntype_id = g.get_ntype_id(ntype)
166
        inner_node_mask = _get_inner_node_mask(part, ntype_id)
167
        inner_nids = F.boolean_mask(part.ndata[dgl.NID], inner_node_mask)
168
169
170
171
172
        ntype_ids, inner_type_nids = gpb.map_to_per_ntype(inner_nids)
        partid = gpb.nid2partid(inner_type_nids, ntype)
        assert np.all(F.asnumpy(ntype_ids) == ntype_id)
        assert np.all(F.asnumpy(partid) == gpb.partid)

173
        orig_id = orig_nids[ntype][inner_type_nids]
174
175
        local_nids = gpb.nid2localnid(inner_type_nids, gpb.partid, ntype)

176
        for name in g.nodes[ntype].data:
177
            if name in [dgl.NID, "inner_node"]:
178
179
                continue
            true_feats = F.gather_row(g.nodes[ntype].data[name], orig_id)
180
            ndata = F.gather_row(node_feats[ntype + "/" + name], local_nids)
181
182
            assert np.all(F.asnumpy(ndata == true_feats))

183
    for etype in g.canonical_etypes:
184
185
        etype_id = g.get_etype_id(etype)
        inner_edge_mask = _get_inner_edge_mask(part, etype_id)
186
        inner_eids = F.boolean_mask(part.edata[dgl.EID], inner_edge_mask)
187
188
189
190
191
        etype_ids, inner_type_eids = gpb.map_to_per_etype(inner_eids)
        partid = gpb.eid2partid(inner_type_eids, etype)
        assert np.all(F.asnumpy(etype_ids) == etype_id)
        assert np.all(F.asnumpy(partid) == gpb.partid)

192
        orig_id = orig_eids[etype][inner_type_eids]
193
194
195
        local_eids = gpb.eid2localeid(inner_type_eids, gpb.partid, etype)

        for name in g.edges[etype].data:
196
            if name in [dgl.EID, "inner_edge"]:
197
198
                continue
            true_feats = F.gather_row(g.edges[etype].data[name], orig_id)
199
200
201
            edata = F.gather_row(
                edge_feats[_etype_tuple_to_str(etype) + "/" + name], local_eids
            )
202
203
            assert np.all(F.asnumpy(edata == true_feats))

204
205
206
207
208
209
210
211
212

def check_hetero_partition(
    hg,
    part_method,
    num_parts=4,
    num_trainers_per_machine=1,
    load_feats=True,
    graph_formats=None,
):
213
214
215
216
217
    test_ntype = "n1"
    test_etype = ("n1", "r1", "n2")
    hg.nodes[test_ntype].data["labels"] = F.arange(0, hg.num_nodes(test_ntype))
    hg.nodes[test_ntype].data["feats"] = F.tensor(
        np.random.randn(hg.num_nodes(test_ntype), 10), F.float32
218
    )
219
220
    hg.edges[test_etype].data["feats"] = F.tensor(
        np.random.randn(hg.num_edges(test_etype), 10), F.float32
221
    )
222
    hg.edges[test_etype].data["labels"] = F.arange(0, hg.num_edges(test_etype))
223
224
    num_hops = 1

225
226
227
228
229
230
231
232
233
234
235
    orig_nids, orig_eids = partition_graph(
        hg,
        "test",
        num_parts,
        "/tmp/partition",
        num_hops=num_hops,
        part_method=part_method,
        return_mapping=True,
        num_trainers_per_machine=num_trainers_per_machine,
        graph_formats=graph_formats,
    )
236
    assert len(orig_nids) == len(hg.ntypes)
237
    assert len(orig_eids) == len(hg.canonical_etypes)
238
    for ntype in hg.ntypes:
Hongzhi (Steve), Chen's avatar
Hongzhi (Steve), Chen committed
239
        assert len(orig_nids[ntype]) == hg.num_nodes(ntype)
240
    for etype in hg.canonical_etypes:
Hongzhi (Steve), Chen's avatar
Hongzhi (Steve), Chen committed
241
        assert len(orig_eids[etype]) == hg.num_edges(etype)
242
    parts = []
243
244
    shuffled_labels = []
    shuffled_elabels = []
245
    for i in range(num_parts):
246
        part_g, node_feats, edge_feats, gpb, _, ntypes, etypes = load_partition(
247
248
            "/tmp/partition/test.json", i, load_feats=load_feats
        )
249
        _verify_partition_data_types(part_g)
250
        _verify_partition_formats(part_g, graph_formats)
251
252
253
        if not load_feats:
            assert not node_feats
            assert not edge_feats
254
255
256
            node_feats, edge_feats = load_partition_feats(
                "/tmp/partition/test.json", i
            )
257
258
        if num_trainers_per_machine > 1:
            for ntype in hg.ntypes:
259
                name = ntype + "/trainer_id"
260
                assert name in node_feats
261
262
263
                part_ids = F.floor_div(
                    node_feats[name], num_trainers_per_machine
                )
264
265
                assert np.all(F.asnumpy(part_ids) == i)

266
267
            for etype in hg.canonical_etypes:
                name = _etype_tuple_to_str(etype) + "/trainer_id"
268
                assert name in edge_feats
269
270
271
                part_ids = F.floor_div(
                    edge_feats[name], num_trainers_per_machine
                )
272
                assert np.all(F.asnumpy(part_ids) == i)
273
274
275
276
277
278
279
280
281
282
283
284
        # Verify the mapping between the reshuffled IDs and the original IDs.
        # These are partition-local IDs.
        part_src_ids, part_dst_ids = part_g.edges()
        # These are reshuffled global homogeneous IDs.
        part_src_ids = F.gather_row(part_g.ndata[dgl.NID], part_src_ids)
        part_dst_ids = F.gather_row(part_g.ndata[dgl.NID], part_dst_ids)
        part_eids = part_g.edata[dgl.EID]
        # These are reshuffled per-type IDs.
        src_ntype_ids, part_src_ids = gpb.map_to_per_ntype(part_src_ids)
        dst_ntype_ids, part_dst_ids = gpb.map_to_per_ntype(part_dst_ids)
        etype_ids, part_eids = gpb.map_to_per_etype(part_eids)
        # These are original per-type IDs.
285
        for etype_id, etype in enumerate(hg.canonical_etypes):
286
            part_src_ids1 = F.boolean_mask(part_src_ids, etype_ids == etype_id)
287
288
289
            src_ntype_ids1 = F.boolean_mask(
                src_ntype_ids, etype_ids == etype_id
            )
290
            part_dst_ids1 = F.boolean_mask(part_dst_ids, etype_ids == etype_id)
291
292
293
            dst_ntype_ids1 = F.boolean_mask(
                dst_ntype_ids, etype_ids == etype_id
            )
294
295
296
297
298
299
300
301
302
303
304
            part_eids1 = F.boolean_mask(part_eids, etype_ids == etype_id)
            assert np.all(F.asnumpy(src_ntype_ids1 == src_ntype_ids1[0]))
            assert np.all(F.asnumpy(dst_ntype_ids1 == dst_ntype_ids1[0]))
            src_ntype = hg.ntypes[F.as_scalar(src_ntype_ids1[0])]
            dst_ntype = hg.ntypes[F.as_scalar(dst_ntype_ids1[0])]
            orig_src_ids1 = F.gather_row(orig_nids[src_ntype], part_src_ids1)
            orig_dst_ids1 = F.gather_row(orig_nids[dst_ntype], part_dst_ids1)
            orig_eids1 = F.gather_row(orig_eids[etype], part_eids1)
            orig_eids2 = hg.edge_ids(orig_src_ids1, orig_dst_ids1, etype=etype)
            assert len(orig_eids1) == len(orig_eids2)
            assert np.all(F.asnumpy(orig_eids1) == F.asnumpy(orig_eids2))
305
        parts.append(part_g)
306
307
308
        verify_graph_feats(
            hg, gpb, part_g, node_feats, edge_feats, orig_nids, orig_eids
        )
309

310
311
312
313
        shuffled_labels.append(node_feats[test_ntype + "/labels"])
        shuffled_elabels.append(
            edge_feats[_etype_tuple_to_str(test_etype) + "/labels"]
        )
314
315
    verify_hetero_graph(hg, parts)

316
317
318
    shuffled_labels = F.asnumpy(F.cat(shuffled_labels, 0))
    shuffled_elabels = F.asnumpy(F.cat(shuffled_elabels, 0))
    orig_labels = np.zeros(shuffled_labels.shape, dtype=shuffled_labels.dtype)
319
320
321
    orig_elabels = np.zeros(
        shuffled_elabels.shape, dtype=shuffled_elabels.dtype
    )
322
323
324
325
326
327
    orig_labels[F.asnumpy(orig_nids[test_ntype])] = shuffled_labels
    orig_elabels[F.asnumpy(orig_eids[test_etype])] = shuffled_elabels
    assert np.all(orig_labels == F.asnumpy(hg.nodes[test_ntype].data["labels"]))
    assert np.all(
        orig_elabels == F.asnumpy(hg.edges[test_etype].data["labels"])
    )
328
329
330
331
332
333
334
335
336
337


def check_partition(
    g,
    part_method,
    num_parts=4,
    num_trainers_per_machine=1,
    load_feats=True,
    graph_formats=None,
):
Hongzhi (Steve), Chen's avatar
Hongzhi (Steve), Chen committed
338
339
340
    g.ndata["labels"] = F.arange(0, g.num_nodes())
    g.ndata["feats"] = F.tensor(np.random.randn(g.num_nodes(), 10), F.float32)
    g.edata["feats"] = F.tensor(np.random.randn(g.num_edges(), 10), F.float32)
341
342
    g.update_all(fn.copy_u("feats", "msg"), fn.sum("msg", "h"))
    g.update_all(fn.copy_e("feats", "msg"), fn.sum("msg", "eh"))
343
    num_hops = 2
Da Zheng's avatar
Da Zheng committed
344

345
346
347
348
349
350
351
352
353
354
355
    orig_nids, orig_eids = partition_graph(
        g,
        "test",
        num_parts,
        "/tmp/partition",
        num_hops=num_hops,
        part_method=part_method,
        return_mapping=True,
        num_trainers_per_machine=num_trainers_per_machine,
        graph_formats=graph_formats,
    )
Da Zheng's avatar
Da Zheng committed
356
    part_sizes = []
357
358
    shuffled_labels = []
    shuffled_edata = []
359
    for i in range(num_parts):
360
        part_g, node_feats, edge_feats, gpb, _, _, _ = load_partition(
361
362
            "/tmp/partition/test.json", i, load_feats=load_feats
        )
363
        _verify_partition_data_types(part_g)
364
        _verify_partition_formats(part_g, graph_formats)
365
366
367
        if not load_feats:
            assert not node_feats
            assert not edge_feats
368
369
370
            node_feats, edge_feats = load_partition_feats(
                "/tmp/partition/test.json", i
            )
371
372
        if num_trainers_per_machine > 1:
            for ntype in g.ntypes:
373
                name = ntype + "/trainer_id"
374
                assert name in node_feats
375
376
377
                part_ids = F.floor_div(
                    node_feats[name], num_trainers_per_machine
                )
378
379
                assert np.all(F.asnumpy(part_ids) == i)

380
381
            for etype in g.canonical_etypes:
                name = _etype_tuple_to_str(etype) + "/trainer_id"
382
                assert name in edge_feats
383
384
385
                part_ids = F.floor_div(
                    edge_feats[name], num_trainers_per_machine
                )
386
                assert np.all(F.asnumpy(part_ids) == i)
387
388

        # Check the metadata
Hongzhi (Steve), Chen's avatar
Hongzhi (Steve), Chen committed
389
390
        assert gpb._num_nodes() == g.num_nodes()
        assert gpb._num_edges() == g.num_edges()
Da Zheng's avatar
Da Zheng committed
391
392
393
394

        assert gpb.num_partitions() == num_parts
        gpb_meta = gpb.metadata()
        assert len(gpb_meta) == num_parts
395
396
397
        assert len(gpb.partid2nids(i)) == gpb_meta[i]["num_nodes"]
        assert len(gpb.partid2eids(i)) == gpb_meta[i]["num_edges"]
        part_sizes.append((gpb_meta[i]["num_nodes"], gpb_meta[i]["num_edges"]))
Da Zheng's avatar
Da Zheng committed
398

399
        nid = F.boolean_mask(part_g.ndata[dgl.NID], part_g.ndata["inner_node"])
400
        local_nid = gpb.nid2localnid(nid, i)
401
        assert F.dtype(local_nid) in (F.int64, F.int32)
Da Zheng's avatar
Da Zheng committed
402
        assert np.all(F.asnumpy(local_nid) == np.arange(0, len(local_nid)))
403
        eid = F.boolean_mask(part_g.edata[dgl.EID], part_g.edata["inner_edge"])
404
        local_eid = gpb.eid2localeid(eid, i)
405
        assert F.dtype(local_eid) in (F.int64, F.int32)
Da Zheng's avatar
Da Zheng committed
406
        assert np.all(F.asnumpy(local_eid) == np.arange(0, len(local_eid)))
407
408

        # Check the node map.
409
410
411
412
        local_nodes = F.boolean_mask(
            part_g.ndata[dgl.NID], part_g.ndata["inner_node"]
        )
        llocal_nodes = F.nonzero_1d(part_g.ndata["inner_node"])
413
        local_nodes1 = gpb.partid2nids(i)
414
        assert F.dtype(local_nodes1) in (F.int32, F.int64)
415
416
417
        assert np.all(
            np.sort(F.asnumpy(local_nodes)) == np.sort(F.asnumpy(local_nodes1))
        )
418
        assert np.all(F.asnumpy(llocal_nodes) == np.arange(len(llocal_nodes)))
419
420

        # Check the edge map.
421
422
423
424
        local_edges = F.boolean_mask(
            part_g.edata[dgl.EID], part_g.edata["inner_edge"]
        )
        llocal_edges = F.nonzero_1d(part_g.edata["inner_edge"])
425
        local_edges1 = gpb.partid2eids(i)
426
        assert F.dtype(local_edges1) in (F.int32, F.int64)
427
428
429
        assert np.all(
            np.sort(F.asnumpy(local_edges)) == np.sort(F.asnumpy(local_edges1))
        )
430
        assert np.all(F.asnumpy(llocal_edges) == np.arange(len(llocal_edges)))
431

432
433
434
435
436
437
438
439
440
441
442
443
        # Verify the mapping between the reshuffled IDs and the original IDs.
        part_src_ids, part_dst_ids = part_g.edges()
        part_src_ids = F.gather_row(part_g.ndata[dgl.NID], part_src_ids)
        part_dst_ids = F.gather_row(part_g.ndata[dgl.NID], part_dst_ids)
        part_eids = part_g.edata[dgl.EID]
        orig_src_ids = F.gather_row(orig_nids, part_src_ids)
        orig_dst_ids = F.gather_row(orig_nids, part_dst_ids)
        orig_eids1 = F.gather_row(orig_eids, part_eids)
        orig_eids2 = g.edge_ids(orig_src_ids, orig_dst_ids)
        assert F.shape(orig_eids1)[0] == F.shape(orig_eids2)[0]
        assert np.all(F.asnumpy(orig_eids1) == F.asnumpy(orig_eids2))

444
445
        local_orig_nids = orig_nids[part_g.ndata[dgl.NID]]
        local_orig_eids = orig_eids[part_g.edata[dgl.EID]]
446
447
        part_g.ndata["feats"] = F.gather_row(g.ndata["feats"], local_orig_nids)
        part_g.edata["feats"] = F.gather_row(g.edata["feats"], local_orig_eids)
448
449
        local_nodes = orig_nids[local_nodes]
        local_edges = orig_eids[local_edges]
450

451
452
        part_g.update_all(fn.copy_u("feats", "msg"), fn.sum("msg", "h"))
        part_g.update_all(fn.copy_e("feats", "msg"), fn.sum("msg", "eh"))
453
454
455
456
457
458
459
460
461
462
463
464
        assert F.allclose(
            F.gather_row(g.ndata["h"], local_nodes),
            F.gather_row(part_g.ndata["h"], llocal_nodes),
        )
        assert F.allclose(
            F.gather_row(g.ndata["eh"], local_nodes),
            F.gather_row(part_g.ndata["eh"], llocal_nodes),
        )

        for name in ["labels", "feats"]:
            assert "_N/" + name in node_feats
            assert node_feats["_N/" + name].shape[0] == len(local_nodes)
465
            true_feats = F.gather_row(g.ndata[name], local_nodes)
466
            ndata = F.gather_row(node_feats["_N/" + name], local_nid)
467
            assert np.all(F.asnumpy(true_feats) == F.asnumpy(ndata))
468
        for name in ["feats"]:
469
470
471
            efeat_name = _etype_tuple_to_str(DEFAULT_ETYPE) + "/" + name
            assert efeat_name in edge_feats
            assert edge_feats[efeat_name].shape[0] == len(local_edges)
472
            true_feats = F.gather_row(g.edata[name], local_edges)
473
            edata = F.gather_row(edge_feats[efeat_name], local_eid)
474
475
476
            assert np.all(F.asnumpy(true_feats) == F.asnumpy(edata))

        # This only works if node/edge IDs are shuffled.
477
478
        shuffled_labels.append(node_feats["_N/labels"])
        shuffled_edata.append(edge_feats["_N:_E:_N/feats"])
479
480

    # Verify that we can reconstruct node/edge data for original IDs.
481
482
    shuffled_labels = F.asnumpy(F.cat(shuffled_labels, 0))
    shuffled_edata = F.asnumpy(F.cat(shuffled_edata, 0))
483
    orig_labels = np.zeros(shuffled_labels.shape, dtype=shuffled_labels.dtype)
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
    orig_edata = np.zeros(shuffled_edata.shape, dtype=shuffled_edata.dtype)
    orig_labels[F.asnumpy(orig_nids)] = shuffled_labels
    orig_edata[F.asnumpy(orig_eids)] = shuffled_edata
    assert np.all(orig_labels == F.asnumpy(g.ndata["labels"]))
    assert np.all(orig_edata == F.asnumpy(g.edata["feats"]))

    node_map = []
    edge_map = []
    for i, (num_nodes, num_edges) in enumerate(part_sizes):
        node_map.append(np.ones(num_nodes) * i)
        edge_map.append(np.ones(num_edges) * i)
    node_map = np.concatenate(node_map)
    edge_map = np.concatenate(edge_map)
    nid2pid = gpb.nid2partid(F.arange(0, len(node_map)))
    assert F.dtype(nid2pid) in (F.int32, F.int64)
    assert np.all(F.asnumpy(nid2pid) == node_map)
    eid2pid = gpb.eid2partid(F.arange(0, len(edge_map)))
    assert F.dtype(eid2pid) in (F.int32, F.int64)
    assert np.all(F.asnumpy(eid2pid) == edge_map)
Da Zheng's avatar
Da Zheng committed
503

504

505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
@pytest.mark.parametrize("part_method", ["metis", "random"])
@pytest.mark.parametrize("num_parts", [1, 4])
@pytest.mark.parametrize("num_trainers_per_machine", [1, 4])
@pytest.mark.parametrize("load_feats", [True, False])
@pytest.mark.parametrize(
    "graph_formats", [None, ["csc"], ["coo", "csc"], ["coo", "csc", "csr"]]
)
def test_partition(
    part_method,
    num_parts,
    num_trainers_per_machine,
    load_feats,
    graph_formats,
):
    os.environ["DGL_DIST_DEBUG"] = "1"
    if part_method == "random" and num_parts > 1:
        num_trainers_per_machine = 1
    g = create_random_graph(1000)
    check_partition(
        g,
        part_method,
        num_parts,
        num_trainers_per_machine,
        load_feats,
        graph_formats,
    )
531
    hg = create_random_hetero()
532
533
534
535
536
537
538
539
    check_hetero_partition(
        hg,
        part_method,
        num_parts,
        num_trainers_per_machine,
        load_feats,
        graph_formats,
    )
540
    reset_envs()
Da Zheng's avatar
Da Zheng committed
541

542

543
def test_RangePartitionBook():
544
    part_id = 1
545
    num_parts = 2
546

547
    # homogeneous
548
549
550
551
    node_map = {DEFAULT_NTYPE: F.tensor([[0, 1000], [1000, 2000]])}
    edge_map = {DEFAULT_ETYPE: F.tensor([[0, 5000], [5000, 10000]])}
    ntypes = {DEFAULT_NTYPE: 0}
    etypes = {DEFAULT_ETYPE: 0}
552
    gpb = RangePartitionBook(
553
554
        part_id, num_parts, node_map, edge_map, ntypes, etypes
    )
555
556
557
    assert gpb.etypes == [DEFAULT_ETYPE[1]]
    assert gpb.canonical_etypes == [DEFAULT_ETYPE]
    assert gpb.to_canonical_etype(DEFAULT_ETYPE[1]) == DEFAULT_ETYPE
558

559
560
561
562
    node_policy = NodePartitionPolicy(gpb, DEFAULT_NTYPE)
    assert node_policy.type_name == DEFAULT_NTYPE
    edge_policy = EdgePartitionPolicy(gpb, DEFAULT_ETYPE)
    assert edge_policy.type_name == DEFAULT_ETYPE
563

564
    # Init via etype is not supported
565
566
567
568
569
570
571
    node_map = {
        "node1": F.tensor([[0, 1000], [1000, 2000]]),
        "node2": F.tensor([[0, 1000], [1000, 2000]]),
    }
    edge_map = {"edge1": F.tensor([[0, 5000], [5000, 10000]])}
    ntypes = {"node1": 0, "node2": 1}
    etypes = {"edge1": 0}
572
573
574
575
576
577
578
579
580
581
582
583
584
585
    expect_except = False
    try:
        RangePartitionBook(
            part_id, num_parts, node_map, edge_map, ntypes, etypes
        )
    except AssertionError:
        expect_except = True
    assert expect_except
    expect_except = False
    try:
        EdgePartitionPolicy(gpb, "edge1")
    except AssertionError:
        expect_except = True
    assert expect_except
586
587

    # heterogeneous, init via canonical etype
588
589
590
591
592
593
594
595
596
    node_map = {
        "node1": F.tensor([[0, 1000], [1000, 2000]]),
        "node2": F.tensor([[0, 1000], [1000, 2000]]),
    }
    edge_map = {
        ("node1", "edge1", "node2"): F.tensor([[0, 5000], [5000, 10000]])
    }
    ntypes = {"node1": 0, "node2": 1}
    etypes = {("node1", "edge1", "node2"): 0}
597
598
    c_etype = list(etypes.keys())[0]
    gpb = RangePartitionBook(
599
600
601
        part_id, num_parts, node_map, edge_map, ntypes, etypes
    )
    assert gpb.etypes == ["edge1"]
602
    assert gpb.canonical_etypes == [c_etype]
603
604
    assert gpb.to_canonical_etype("edge1") == c_etype
    assert gpb.to_canonical_etype(c_etype) == c_etype
605
606
    expect_except = False
    try:
607
        gpb.to_canonical_etype(("node1", "edge2", "node2"))
608
    except BaseException:
609
610
611
612
        expect_except = True
    assert expect_except
    expect_except = False
    try:
613
        gpb.to_canonical_etype("edge2")
614
    except BaseException:
615
616
617
        expect_except = True
    assert expect_except

618
    # NodePartitionPolicy
619
620
    node_policy = NodePartitionPolicy(gpb, "node1")
    assert node_policy.type_name == "node1"
621
622
623
    assert node_policy.policy_str == "node~node1"
    assert node_policy.part_id == part_id
    assert node_policy.is_node
624
    assert node_policy.get_data_name("x").is_node()
625
626
627
628
629
630
631
632
    local_ids = th.arange(0, 1000)
    global_ids = local_ids + 1000
    assert th.equal(node_policy.to_local(global_ids), local_ids)
    assert th.all(node_policy.to_partid(global_ids) == part_id)
    assert node_policy.get_part_size() == 1000
    assert node_policy.get_size() == 2000

    # EdgePartitionPolicy
633
634
    edge_policy = EdgePartitionPolicy(gpb, c_etype)
    assert edge_policy.type_name == c_etype
635
636
637
    assert edge_policy.policy_str == "edge~node1:edge1:node2"
    assert edge_policy.part_id == part_id
    assert not edge_policy.is_node
638
    assert not edge_policy.get_data_name("x").is_node()
639
640
641
642
643
644
    local_ids = th.arange(0, 5000)
    global_ids = local_ids + 5000
    assert th.equal(edge_policy.to_local(global_ids), local_ids)
    assert th.all(edge_policy.to_partid(global_ids) == part_id)
    assert edge_policy.get_part_size() == 5000
    assert edge_policy.get_size() == 10000
645

646
647
648
    expect_except = False
    try:
        HeteroDataName(False, "edge1", "feat")
649
    except BaseException:
650
651
652
        expect_except = True
    assert expect_except
    data_name = HeteroDataName(False, c_etype, "feat")
653
    assert data_name.get_type() == c_etype
654
655
656


def test_UnknownPartitionBook():
657
658
    node_map = {"_N": {0: 0, 1: 1, 2: 2}}
    edge_map = {"_N:_E:_N": {0: 0, 1: 1, 2: 2}}
659
660
661
662
663
664
665

    part_metadata = {
        "num_parts": 1,
        "num_nodes": len(node_map),
        "num_edges": len(edge_map),
        "node_map": node_map,
        "edge_map": edge_map,
666
        "graph_name": "test_graph",
667
668
669
670
671
    }

    with tempfile.TemporaryDirectory() as test_dir:
        part_config = os.path.join(test_dir, "test_graph.json")
        with open(part_config, "w") as file:
672
            json.dump(part_metadata, file, indent=4)
673
674
675
676
677
        try:
            load_partition_book(part_config, 0)
        except Exception as e:
            if not isinstance(e, TypeError):
                raise e
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739


@pytest.mark.parametrize("part_method", ["metis", "random"])
@pytest.mark.parametrize("num_parts", [1, 4])
def test_convert_dgl_partition_to_csc_sampling_graph_homo(
    part_method, num_parts
):
    with tempfile.TemporaryDirectory() as test_dir:
        g = create_random_graph(1000)
        graph_name = "test"
        partition_graph(
            g, graph_name, num_parts, test_dir, part_method=part_method
        )
        part_config = os.path.join(test_dir, f"{graph_name}.json")
        convert_dgl_partition_to_csc_sampling_graph(part_config)
        for part_id in range(num_parts):
            orig_g = dgl.load_graphs(
                os.path.join(test_dir, f"part{part_id}/graph.dgl")
            )[0][0]
            new_g = dgl.graphbolt.load_csc_sampling_graph(
                os.path.join(test_dir, f"part{part_id}/csc_sampling_graph.tar")
            )
            orig_indptr, orig_indices, _ = orig_g.adj().csc()
            assert th.equal(orig_indptr, new_g.csc_indptr)
            assert th.equal(orig_indices, new_g.indices)
            assert new_g.node_type_offset is None
            assert all(new_g.type_per_edge == 0)
            for node_type, type_id in new_g.metadata.node_type_to_id.items():
                assert g.get_ntype_id(node_type) == type_id
            for edge_type, type_id in new_g.metadata.edge_type_to_id.items():
                assert g.get_etype_id(edge_type) == type_id


@pytest.mark.parametrize("part_method", ["metis", "random"])
@pytest.mark.parametrize("num_parts", [1, 4])
def test_convert_dgl_partition_to_csc_sampling_graph_hetero(
    part_method, num_parts
):
    with tempfile.TemporaryDirectory() as test_dir:
        g = create_random_hetero()
        graph_name = "test"
        partition_graph(
            g, graph_name, num_parts, test_dir, part_method=part_method
        )
        part_config = os.path.join(test_dir, f"{graph_name}.json")
        convert_dgl_partition_to_csc_sampling_graph(part_config)
        for part_id in range(num_parts):
            orig_g = dgl.load_graphs(
                os.path.join(test_dir, f"part{part_id}/graph.dgl")
            )[0][0]
            new_g = dgl.graphbolt.load_csc_sampling_graph(
                os.path.join(test_dir, f"part{part_id}/csc_sampling_graph.tar")
            )
            orig_indptr, orig_indices, _ = orig_g.adj().csc()
            assert th.equal(orig_indptr, new_g.csc_indptr)
            assert th.equal(orig_indices, new_g.indices)
            for node_type, type_id in new_g.metadata.node_type_to_id.items():
                assert g.get_ntype_id(node_type) == type_id
            for edge_type, type_id in new_g.metadata.edge_type_to_id.items():
                assert g.get_etype_id(edge_type) == type_id
            assert new_g.node_type_offset is None
            assert th.equal(orig_g.edata[dgl.ETYPE], new_g.type_per_edge)