test_dist_part.py 14.1 KB
Newer Older
1
2
3
import json
import os
import tempfile
4

5
6
import dgl

7
import numpy as np
8
import pyarrow.parquet as pq
9
10
import pytest
import torch
11
12
13
14
15
16
17
18
from dgl.data.utils import load_graphs, load_tensors
from dgl.distributed.partition import (
    _etype_tuple_to_str,
    _get_inner_edge_mask,
    _get_inner_node_mask,
    load_partition,
    RESERVED_FIELD_DTYPE,
)
19

20
from distpartitioning import array_readwriter
21
from distpartitioning.utils import generate_read_list
22
from utils import create_chunked_dataset
23
24
25
26
27
28
29
30


def _verify_partition_data_types(part_g):
    for k, dtype in RESERVED_FIELD_DTYPE.items():
        if k in part_g.ndata:
            assert part_g.ndata[k].dtype == dtype
        if k in part_g.edata:
            assert part_g.edata[k].dtype == dtype
31

32

33
34
35
36
37
def _verify_partition_formats(part_g, formats):
    # Verify saved graph formats
    if formats is None:
        assert "coo" in part_g.formats()["created"]
    else:
38
        formats = formats.split(",")
39
40
41
        for format in formats:
            assert format in part_g.formats()["created"]

42

43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
def _verify_graph_feats(
    g, gpb, part, node_feats, edge_feats, orig_nids, orig_eids
):
    for ntype in g.ntypes:
        ntype_id = g.get_ntype_id(ntype)
        inner_node_mask = _get_inner_node_mask(part, ntype_id)
        inner_nids = part.ndata[dgl.NID][inner_node_mask]
        ntype_ids, inner_type_nids = gpb.map_to_per_ntype(inner_nids)
        partid = gpb.nid2partid(inner_type_nids, ntype)
        assert np.all(ntype_ids.numpy() == ntype_id)
        assert np.all(partid.numpy() == gpb.partid)

        orig_id = orig_nids[ntype][inner_type_nids]
        local_nids = gpb.nid2localnid(inner_type_nids, gpb.partid, ntype)

        for name in g.nodes[ntype].data:
            if name in [dgl.NID, "inner_node"]:
                continue
            true_feats = g.nodes[ntype].data[name][orig_id]
            ndata = node_feats[ntype + "/" + name][local_nids]
63
            assert np.array_equal(ndata.numpy(), true_feats.numpy())
64

65
    for etype in g.canonical_etypes:
66
67
68
69
70
71
72
73
        etype_id = g.get_etype_id(etype)
        inner_edge_mask = _get_inner_edge_mask(part, etype_id)
        inner_eids = part.edata[dgl.EID][inner_edge_mask]
        etype_ids, inner_type_eids = gpb.map_to_per_etype(inner_eids)
        partid = gpb.eid2partid(inner_type_eids, etype)
        assert np.all(etype_ids.numpy() == etype_id)
        assert np.all(partid.numpy() == gpb.partid)

74
        orig_id = orig_eids[_etype_tuple_to_str(etype)][inner_type_eids]
75
76
77
78
79
80
        local_eids = gpb.eid2localeid(inner_type_eids, gpb.partid, etype)

        for name in g.edges[etype].data:
            if name in [dgl.EID, "inner_edge"]:
                continue
            true_feats = g.edges[etype].data[name][orig_id]
81
82
83
            edata = edge_feats[_etype_tuple_to_str(etype) + "/" + name][
                local_eids
            ]
84
            assert np.array_equal(edata.numpy(), true_feats.numpy())
85

86

87
88
def _test_chunk_graph(
    num_chunks,
89
90
91
92
93
94
95
    data_fmt="numpy",
    edges_fmt="csv",
    vector_rows=False,
    num_chunks_nodes=None,
    num_chunks_edges=None,
    num_chunks_node_data=None,
    num_chunks_edge_data=None,
96
):
97
    with tempfile.TemporaryDirectory() as root_dir:
98
99
100
101
102
103
104
105
106
107
108
        g = create_chunked_dataset(
            root_dir,
            num_chunks,
            data_fmt=data_fmt,
            edges_fmt=edges_fmt,
            vector_rows=vector_rows,
            num_chunks_nodes=num_chunks_nodes,
            num_chunks_edges=num_chunks_edges,
            num_chunks_node_data=num_chunks_node_data,
            num_chunks_edge_data=num_chunks_edge_data,
        )
109
110

        # check metadata.json
111
112
        output_dir = os.path.join(root_dir, "chunked-data")
        json_file = os.path.join(output_dir, "metadata.json")
113
        assert os.path.isfile(json_file)
114
        with open(json_file, "rb") as f:
115
            meta_data = json.load(f)
116
117
        assert meta_data["graph_name"] == "mag240m"
        assert len(meta_data["num_nodes_per_chunk"][0]) == num_chunks
118
119

        # check edge_index
120
        output_edge_index_dir = os.path.join(output_dir, "edge_index")
121
122
        for c_etype in g.canonical_etypes:
            c_etype_str = _etype_tuple_to_str(c_etype)
123
124
125
126
127
            if num_chunks_edges is None:
                n_chunks = num_chunks
            else:
                n_chunks = num_chunks_edges
            for i in range(n_chunks):
128
                fname = os.path.join(
129
                    output_edge_index_dir, f"{c_etype_str}{i}.txt"
130
                )
131
                assert os.path.isfile(fname)
132
                if edges_fmt == "csv":
133
134
135
136
137
                    with open(fname, "r") as f:
                        header = f.readline()
                        num1, num2 = header.rstrip().split(" ")
                        assert isinstance(int(num1), int)
                        assert isinstance(int(num2), int)
138
                elif edges_fmt == "parquet":
139
140
141
142
                    metadata = pq.read_metadata(fname)
                    assert metadata.num_columns == 2
                else:
                    assert False, f"Invalid edges_fmt: {edges_fmt}"
143

144
        # check node/edge_data
145
        suffix = "npy" if data_fmt == "numpy" else "parquet"
146
        reader_fmt_meta = {"name": data_fmt}
147
148

        def test_data(sub_dir, feat, expected_data, expected_shape, num_chunks):
149
            data = []
150
            for i in range(num_chunks):
151
152
153
154
155
                fname = os.path.join(sub_dir, f"{feat}-{i}.{suffix}")
                assert os.path.isfile(fname), f"{fname} cannot be found."
                feat_array = array_readwriter.get_array_parser(
                    **reader_fmt_meta
                ).read(fname)
156
157
158
159
160
161
162
163
                assert feat_array.shape[0] == expected_shape
                data.append(feat_array)
            data = np.concatenate(data, 0)
            assert torch.equal(torch.from_numpy(data), expected_data)

        output_node_data_dir = os.path.join(output_dir, "node_data")
        for ntype in g.ntypes:
            sub_dir = os.path.join(output_node_data_dir, ntype)
164
165
166
167
168
169
            if isinstance(num_chunks_node_data, int):
                chunks_data = num_chunks_node_data
            elif isinstance(num_chunks_node_data, dict):
                chunks_data = num_chunks_node_data.get(ntype, num_chunks)
            else:
                chunks_data = num_chunks
170
            for feat, data in g.nodes[ntype].data.items():
171
172
173
174
                if isinstance(chunks_data, dict):
                    n_chunks = chunks_data.get(feat, num_chunks)
                else:
                    n_chunks = chunks_data
175
176
177
178
179
180
181
                test_data(
                    sub_dir,
                    feat,
                    data,
                    g.num_nodes(ntype) // n_chunks,
                    n_chunks,
                )
182

183
        output_edge_data_dir = os.path.join(output_dir, "edge_data")
184
185
186
        for c_etype in g.canonical_etypes:
            c_etype_str = _etype_tuple_to_str(c_etype)
            sub_dir = os.path.join(output_edge_data_dir, c_etype_str)
187
188
189
190
191
192
            if isinstance(num_chunks_edge_data, int):
                chunks_data = num_chunks_edge_data
            elif isinstance(num_chunks_edge_data, dict):
                chunks_data = num_chunks_edge_data.get(c_etype, num_chunks)
            else:
                chunks_data = num_chunks
193
            for feat, data in g.edges[c_etype].data.items():
194
195
196
197
                if isinstance(chunks_data, dict):
                    n_chunks = chunks_data.get(feat, num_chunks)
                else:
                    n_chunks = chunks_data
198
199
200
201
202
203
204
                test_data(
                    sub_dir,
                    feat,
                    data,
                    g.num_edges(c_etype) // n_chunks,
                    n_chunks,
                )
205
206


207
@pytest.mark.parametrize("num_chunks", [1, 8])
208
209
@pytest.mark.parametrize("data_fmt", ["numpy", "parquet"])
@pytest.mark.parametrize("edges_fmt", ["csv", "parquet"])
210
211
def test_chunk_graph_basics(num_chunks, data_fmt, edges_fmt):
    _test_chunk_graph(num_chunks, data_fmt=data_fmt, edges_fmt=edges_fmt)
212

213

214
215
216
@pytest.mark.parametrize("num_chunks", [1, 8])
@pytest.mark.parametrize("vector_rows", [True, False])
def test_chunk_graph_vector_rows(num_chunks, vector_rows):
217
218
219
220
221
222
    _test_chunk_graph(
        num_chunks,
        data_fmt="parquet",
        edges_fmt="parquet",
        vector_rows=vector_rows,
    )
223

224
225
226
227
228
229
230
231
232
233
234

@pytest.mark.parametrize(
    "num_chunks, "
    "num_chunks_nodes, "
    "num_chunks_edges, "
    "num_chunks_node_data, "
    "num_chunks_edge_data",
    [
        [1, None, None, None, None],
        [8, None, None, None, None],
        [4, 4, 4, 8, 12],
235
236
237
238
239
240
241
242
243
        [4, 4, 4, {"paper": 10}, {("author", "writes", "paper"): 24}],
        [
            4,
            4,
            4,
            {"paper": {"feat": 10}},
            {("author", "writes", "paper"): {"year": 24}},
        ],
    ],
244
245
246
247
248
249
)
def test_chunk_graph_arbitray_chunks(
    num_chunks,
    num_chunks_nodes,
    num_chunks_edges,
    num_chunks_node_data,
250
    num_chunks_edge_data,
251
252
253
254
255
256
):
    _test_chunk_graph(
        num_chunks,
        num_chunks_nodes=num_chunks_nodes,
        num_chunks_edges=num_chunks_edges,
        num_chunks_node_data=num_chunks_node_data,
257
        num_chunks_edge_data=num_chunks_edge_data,
258
259
260
261
262
263
264
265
    )


def _test_pipeline(
    num_chunks,
    num_parts,
    world_size,
    graph_formats=None,
266
    data_fmt="numpy",
267
268
269
    num_chunks_nodes=None,
    num_chunks_edges=None,
    num_chunks_node_data=None,
270
    num_chunks_edge_data=None,
271
):
272
273
274
275
    if num_chunks < num_parts:
        # num_parts should less/equal than num_chunks
        return

276
277
278
279
    if num_parts % world_size != 0:
        # num_parts should be a multiple of world_size
        return

280
    with tempfile.TemporaryDirectory() as root_dir:
281
282
283
284
285
286
287
288
289
        g = create_chunked_dataset(
            root_dir,
            num_chunks,
            data_fmt=data_fmt,
            num_chunks_nodes=num_chunks_nodes,
            num_chunks_edges=num_chunks_edges,
            num_chunks_node_data=num_chunks_node_data,
            num_chunks_edge_data=num_chunks_edge_data,
        )
290
291

        # Step1: graph partition
292
293
        in_dir = os.path.join(root_dir, "chunked-data")
        output_dir = os.path.join(root_dir, "parted_data")
294
        os.system(
295
296
            "python3 tools/partition_algo/random_partition.py "
            "--in_dir {} --out_dir {} --num_partitions {}".format(
297
298
299
                in_dir, output_dir, num_parts
            )
        )
300
301
302
        for ntype in ["author", "institution", "paper"]:
            fname = os.path.join(output_dir, "{}.txt".format(ntype))
            with open(fname, "r") as f:
303
304
305
306
                header = f.readline().rstrip()
                assert isinstance(int(header), int)

        # Step2: data dispatch
307
308
309
310
        partition_dir = os.path.join(root_dir, "parted_data")
        out_dir = os.path.join(root_dir, "partitioned")
        ip_config = os.path.join(root_dir, "ip_config.txt")
        with open(ip_config, "w") as f:
311
            for i in range(world_size):
312
                f.write(f"127.0.0.{i + 1}\n")
313
314
315
316
317
318

        cmd = "python3 tools/dispatch_data.py"
        cmd += f" --in-dir {in_dir}"
        cmd += f" --partitions-dir {partition_dir}"
        cmd += f" --out-dir {out_dir}"
        cmd += f" --ip-config {ip_config}"
319
        cmd += " --ssh-port 22"
320
321
322
        cmd += " --process-group-timeout 60"
        cmd += " --save-orig-nids"
        cmd += " --save-orig-eids"
323
        cmd += f" --graph-formats {graph_formats}" if graph_formats else ""
324
        os.system(cmd)
325

326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
        # read original node/edge IDs
        def read_orig_ids(fname):
            orig_ids = {}
            for i in range(num_parts):
                ids_path = os.path.join(out_dir, f"part{i}", fname)
                part_ids = load_tensors(ids_path)
                for type, data in part_ids.items():
                    if type not in orig_ids:
                        orig_ids[type] = data
                    else:
                        orig_ids[type] = torch.cat((orig_ids[type], data))
            return orig_ids

        orig_nids = read_orig_ids("orig_nids.dgl")
        orig_eids = read_orig_ids("orig_eids.dgl")

        # load partitions and verify
        part_config = os.path.join(out_dir, "metadata.json")
344
        for i in range(num_parts):
345
346
347
            part_g, node_feats, edge_feats, gpb, _, _, _ = load_partition(
                part_config, i
            )
348
            _verify_partition_data_types(part_g)
349
            _verify_partition_formats(part_g, graph_formats)
350
351
352
            _verify_graph_feats(
                g, gpb, part_g, node_feats, edge_feats, orig_nids, orig_eids
            )
353
354


355
356
357
@pytest.mark.parametrize(
    "num_chunks, num_parts, world_size",
    [[4, 4, 4], [8, 4, 2], [8, 4, 4], [9, 6, 3], [11, 11, 1], [11, 4, 1]],
358
)
359
360
def test_pipeline_basics(num_chunks, num_parts, world_size):
    _test_pipeline(num_chunks, num_parts, world_size)
361
362
363
364
365
366


@pytest.mark.parametrize(
    "graph_formats", [None, "csc", "coo,csc", "coo,csc,csr"]
)
def test_pipeline_formats(graph_formats):
367
    _test_pipeline(4, 4, 4, graph_formats)
368

369
370

@pytest.mark.parametrize(
371
372
373
374
375
376
377
378
379
    "num_chunks, "
    "num_parts, "
    "world_size, "
    "num_chunks_node_data, "
    "num_chunks_edge_data",
    [
        [8, 4, 2, 20, 25],
        [9, 7, 5, 3, 11],
        [8, 8, 4, 3, 5],
380
381
382
383
384
385
386
387
        [
            8,
            4,
            2,
            {"paper": {"feat": 11, "year": 1}},
            {("author", "writes", "paper"): {"year": 24}},
        ],
    ],
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
)
def test_pipeline_arbitray_chunks(
    num_chunks,
    num_parts,
    world_size,
    num_chunks_node_data,
    num_chunks_edge_data,
):
    _test_pipeline(
        num_chunks,
        num_parts,
        world_size,
        num_chunks_node_data=num_chunks_node_data,
        num_chunks_edge_data=num_chunks_edge_data,
    )


@pytest.mark.parametrize(
    "graph_formats", [None, "csc", "coo,csc", "coo,csc,csr"]
)
def test_pipeline_formats(graph_formats):
    _test_pipeline(4, 4, 4, graph_formats)


412
@pytest.mark.parametrize("data_fmt", ["numpy", "parquet"])
413
414
def test_pipeline_feature_format(data_fmt):
    _test_pipeline(4, 4, 4, data_fmt=data_fmt)
415
416
417
418
419
420
421
422


def test_utils_generate_read_list():
    read_list = generate_read_list(10, 4)
    assert np.array_equal(read_list[0], np.array([0, 1, 2]))
    assert np.array_equal(read_list[1], np.array([3, 4, 5]))
    assert np.array_equal(read_list[2], np.array([6, 7]))
    assert np.array_equal(read_list[3], np.array([8, 9]))