test_dist_part.py 13.8 KB
Newer Older
1
2
3
import json
import os
import tempfile
4
5
6
7

import numpy as np
import pytest
import torch
8
import pyarrow.parquet as pq
9
10
from utils import create_chunked_dataset

11
from distpartitioning import array_readwriter
12
from distpartitioning.utils import generate_read_list
13
14

import dgl
15
from dgl.data.utils import load_graphs, load_tensors
16
17
18
19
from dgl.distributed.partition import (RESERVED_FIELD_DTYPE,
                                       _etype_tuple_to_str,
                                       _get_inner_edge_mask,
                                       _get_inner_node_mask, load_partition)
20
21
22
23
24
25
26
27


def _verify_partition_data_types(part_g):
    for k, dtype in RESERVED_FIELD_DTYPE.items():
        if k in part_g.ndata:
            assert part_g.ndata[k].dtype == dtype
        if k in part_g.edata:
            assert part_g.edata[k].dtype == dtype
28

29
30
31
32
33
34
35
36
37
def _verify_partition_formats(part_g, formats):
    # Verify saved graph formats
    if formats is None:
        assert "coo" in part_g.formats()["created"]
    else:
        formats = formats.split(',')
        for format in formats:
            assert format in part_g.formats()["created"]

38

39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
def _verify_graph_feats(
    g, gpb, part, node_feats, edge_feats, orig_nids, orig_eids
):
    for ntype in g.ntypes:
        ntype_id = g.get_ntype_id(ntype)
        inner_node_mask = _get_inner_node_mask(part, ntype_id)
        inner_nids = part.ndata[dgl.NID][inner_node_mask]
        ntype_ids, inner_type_nids = gpb.map_to_per_ntype(inner_nids)
        partid = gpb.nid2partid(inner_type_nids, ntype)
        assert np.all(ntype_ids.numpy() == ntype_id)
        assert np.all(partid.numpy() == gpb.partid)

        orig_id = orig_nids[ntype][inner_type_nids]
        local_nids = gpb.nid2localnid(inner_type_nids, gpb.partid, ntype)

        for name in g.nodes[ntype].data:
            if name in [dgl.NID, "inner_node"]:
                continue
            true_feats = g.nodes[ntype].data[name][orig_id]
            ndata = node_feats[ntype + "/" + name][local_nids]
59
            assert np.array_equal(ndata.numpy(), true_feats.numpy())
60

61
    for etype in g.canonical_etypes:
62
63
64
65
66
67
68
69
        etype_id = g.get_etype_id(etype)
        inner_edge_mask = _get_inner_edge_mask(part, etype_id)
        inner_eids = part.edata[dgl.EID][inner_edge_mask]
        etype_ids, inner_type_eids = gpb.map_to_per_etype(inner_eids)
        partid = gpb.eid2partid(inner_type_eids, etype)
        assert np.all(etype_ids.numpy() == etype_id)
        assert np.all(partid.numpy() == gpb.partid)

70
        orig_id = orig_eids[_etype_tuple_to_str(etype)][inner_type_eids]
71
72
73
74
75
76
        local_eids = gpb.eid2localeid(inner_type_eids, gpb.partid, etype)

        for name in g.edges[etype].data:
            if name in [dgl.EID, "inner_edge"]:
                continue
            true_feats = g.edges[etype].data[name][orig_id]
77
            edata = edge_feats[_etype_tuple_to_str(etype) + "/" + name][local_eids]
78
            assert np.array_equal(edata.numpy(), true_feats.numpy())
79

80

81
82
83
def _test_chunk_graph(
    num_chunks,
    data_fmt = 'numpy',
84
    edges_fmt = 'csv',
85
    vector_rows = False,
86
87
88
89
90
    num_chunks_nodes = None,
    num_chunks_edges = None,
    num_chunks_node_data = None,
    num_chunks_edge_data = None
):
91
    with tempfile.TemporaryDirectory() as root_dir:
92

93
        g = create_chunked_dataset(root_dir, num_chunks,
94
                data_fmt=data_fmt, edges_fmt=edges_fmt,
95
                vector_rows=vector_rows,
96
97
98
99
100
                num_chunks_nodes=num_chunks_nodes,
                num_chunks_edges=num_chunks_edges,
                num_chunks_node_data=num_chunks_node_data,
                num_chunks_edge_data=num_chunks_edge_data
            )
101
102

        # check metadata.json
103
104
        output_dir = os.path.join(root_dir, "chunked-data")
        json_file = os.path.join(output_dir, "metadata.json")
105
        assert os.path.isfile(json_file)
106
        with open(json_file, "rb") as f:
107
            meta_data = json.load(f)
108
109
        assert meta_data["graph_name"] == "mag240m"
        assert len(meta_data["num_nodes_per_chunk"][0]) == num_chunks
110
111

        # check edge_index
112
        output_edge_index_dir = os.path.join(output_dir, "edge_index")
113
114
        for c_etype in g.canonical_etypes:
            c_etype_str = _etype_tuple_to_str(c_etype)
115
116
117
118
119
            if num_chunks_edges is None:
                n_chunks = num_chunks
            else:
                n_chunks = num_chunks_edges
            for i in range(n_chunks):
120
121
                fname = os.path.join(
                    output_edge_index_dir, f'{c_etype_str}{i}.txt'
122
                )
123
                assert os.path.isfile(fname)
124
125
126
127
128
129
130
131
132
133
134
                if edges_fmt == 'csv':
                    with open(fname, "r") as f:
                        header = f.readline()
                        num1, num2 = header.rstrip().split(" ")
                        assert isinstance(int(num1), int)
                        assert isinstance(int(num2), int)
                elif edges_fmt == 'parquet':
                    metadata = pq.read_metadata(fname)
                    assert metadata.num_columns == 2
                else:
                    assert False, f"Invalid edges_fmt: {edges_fmt}"
135

136
        # check node/edge_data
137
138
        suffix = 'npy' if data_fmt=='numpy' else 'parquet'
        reader_fmt_meta = {"name": data_fmt}
139
140
141
        def test_data(
            sub_dir, feat, expected_data, expected_shape, num_chunks
        ):
142
            data = []
143
            for i in range(num_chunks):
144
                fname = os.path.join(sub_dir, f'{feat}-{i}.{suffix}')
145
                assert os.path.isfile(fname), f'{fname} cannot be found.'
146
147
148
                feat_array =  array_readwriter.get_array_parser(
                            **reader_fmt_meta
                        ).read(fname)
149
150
151
152
153
154
155
156
                assert feat_array.shape[0] == expected_shape
                data.append(feat_array)
            data = np.concatenate(data, 0)
            assert torch.equal(torch.from_numpy(data), expected_data)

        output_node_data_dir = os.path.join(output_dir, "node_data")
        for ntype in g.ntypes:
            sub_dir = os.path.join(output_node_data_dir, ntype)
157
158
159
160
161
162
            if isinstance(num_chunks_node_data, int):
                chunks_data = num_chunks_node_data
            elif isinstance(num_chunks_node_data, dict):
                chunks_data = num_chunks_node_data.get(ntype, num_chunks)
            else:
                chunks_data = num_chunks
163
            for feat, data in g.nodes[ntype].data.items():
164
165
166
167
168
169
                if isinstance(chunks_data, dict):
                    n_chunks = chunks_data.get(feat, num_chunks)
                else:
                    n_chunks = chunks_data
                test_data(sub_dir, feat, data, g.num_nodes(ntype) // n_chunks,
                    n_chunks)
170

171
        output_edge_data_dir = os.path.join(output_dir, "edge_data")
172
173
174
        for c_etype in g.canonical_etypes:
            c_etype_str = _etype_tuple_to_str(c_etype)
            sub_dir = os.path.join(output_edge_data_dir, c_etype_str)
175
176
177
178
179
180
            if isinstance(num_chunks_edge_data, int):
                chunks_data = num_chunks_edge_data
            elif isinstance(num_chunks_edge_data, dict):
                chunks_data = num_chunks_edge_data.get(c_etype, num_chunks)
            else:
                chunks_data = num_chunks
181
            for feat, data in g.edges[c_etype].data.items():
182
183
184
185
186
187
                if isinstance(chunks_data, dict):
                    n_chunks = chunks_data.get(feat, num_chunks)
                else:
                    n_chunks = chunks_data
                test_data(sub_dir, feat, data, g.num_edges(c_etype) // n_chunks,
                    n_chunks)
188
189


190
191
@pytest.mark.parametrize("num_chunks", [1, 8])
@pytest.mark.parametrize("data_fmt", ['numpy', 'parquet'])
192
193
194
@pytest.mark.parametrize("edges_fmt", ['csv', 'parquet'])
def test_chunk_graph_basics(num_chunks, data_fmt, edges_fmt):
    _test_chunk_graph(num_chunks, data_fmt=data_fmt, edges_fmt=edges_fmt)
195

196
197
198
199
200
@pytest.mark.parametrize("num_chunks", [1, 8])
@pytest.mark.parametrize("vector_rows", [True, False])
def test_chunk_graph_vector_rows(num_chunks, vector_rows):
    _test_chunk_graph(num_chunks, data_fmt='parquet', edges_fmt='parquet', vector_rows=vector_rows)

201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243

@pytest.mark.parametrize(
    "num_chunks, "
    "num_chunks_nodes, "
    "num_chunks_edges, "
    "num_chunks_node_data, "
    "num_chunks_edge_data",
    [
        [1, None, None, None, None],
        [8, None, None, None, None],
        [4, 4, 4, 8, 12],
        [4, 4, 4, {'paper': 10}, {('author', 'writes', 'paper'): 24}],
        [4, 4, 4, {'paper': {'feat': 10}},
            {('author', 'writes', 'paper'): {'year': 24}}],
    ]
)
def test_chunk_graph_arbitray_chunks(
    num_chunks,
    num_chunks_nodes,
    num_chunks_edges,
    num_chunks_node_data,
    num_chunks_edge_data
):
    _test_chunk_graph(
        num_chunks,
        num_chunks_nodes=num_chunks_nodes,
        num_chunks_edges=num_chunks_edges,
        num_chunks_node_data=num_chunks_node_data,
        num_chunks_edge_data=num_chunks_edge_data
    )


def _test_pipeline(
    num_chunks,
    num_parts,
    world_size,
    graph_formats=None,
    data_fmt='numpy',
    num_chunks_nodes=None,
    num_chunks_edges=None,
    num_chunks_node_data=None,
    num_chunks_edge_data=None
):
244
245
246
247
    if num_chunks < num_parts:
        # num_parts should less/equal than num_chunks
        return

248
249
250
251
    if num_parts % world_size != 0:
        # num_parts should be a multiple of world_size
        return

252
253
    with tempfile.TemporaryDirectory() as root_dir:

254
255
256
257
258
259
260
        g = create_chunked_dataset(root_dir, num_chunks,
                data_fmt=data_fmt,
                num_chunks_nodes=num_chunks_nodes,
                num_chunks_edges=num_chunks_edges,
                num_chunks_node_data=num_chunks_node_data,
                num_chunks_edge_data=num_chunks_edge_data
            )
261
262

        # Step1: graph partition
263
264
        in_dir = os.path.join(root_dir, "chunked-data")
        output_dir = os.path.join(root_dir, "parted_data")
265
        os.system(
266
267
            "python3 tools/partition_algo/random_partition.py "
            "--in_dir {} --out_dir {} --num_partitions {}".format(
268
269
270
                in_dir, output_dir, num_parts
            )
        )
271
272
273
        for ntype in ["author", "institution", "paper"]:
            fname = os.path.join(output_dir, "{}.txt".format(ntype))
            with open(fname, "r") as f:
274
275
276
277
                header = f.readline().rstrip()
                assert isinstance(int(header), int)

        # Step2: data dispatch
278
279
280
281
282
283
        partition_dir = os.path.join(root_dir, 'parted_data')
        out_dir = os.path.join(root_dir, 'partitioned')
        ip_config = os.path.join(root_dir, 'ip_config.txt')
        with open(ip_config, 'w') as f:
            for i in range(world_size):
                f.write(f'127.0.0.{i + 1}\n')
284
285
286
287
288
289

        cmd = "python3 tools/dispatch_data.py"
        cmd += f" --in-dir {in_dir}"
        cmd += f" --partitions-dir {partition_dir}"
        cmd += f" --out-dir {out_dir}"
        cmd += f" --ip-config {ip_config}"
290
        cmd += " --ssh-port 22"
291
292
293
        cmd += " --process-group-timeout 60"
        cmd += " --save-orig-nids"
        cmd += " --save-orig-eids"
294
        cmd += f" --graph-formats {graph_formats}" if graph_formats else ""
295
        os.system(cmd)
296

297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
        # read original node/edge IDs
        def read_orig_ids(fname):
            orig_ids = {}
            for i in range(num_parts):
                ids_path = os.path.join(out_dir, f"part{i}", fname)
                part_ids = load_tensors(ids_path)
                for type, data in part_ids.items():
                    if type not in orig_ids:
                        orig_ids[type] = data
                    else:
                        orig_ids[type] = torch.cat((orig_ids[type], data))
            return orig_ids

        orig_nids = read_orig_ids("orig_nids.dgl")
        orig_eids = read_orig_ids("orig_eids.dgl")

        # load partitions and verify
        part_config = os.path.join(out_dir, "metadata.json")
315
        for i in range(num_parts):
316
317
318
            part_g, node_feats, edge_feats, gpb, _, _, _ = load_partition(
                part_config, i
            )
319
            _verify_partition_data_types(part_g)
320
            _verify_partition_formats(part_g, graph_formats)
321
322
323
            _verify_graph_feats(
                g, gpb, part_g, node_feats, edge_feats, orig_nids, orig_eids
            )
324
325


326
327
328
@pytest.mark.parametrize("num_chunks, num_parts, world_size",
    [[4, 4, 4], [8, 4, 2], [8, 4, 4], [9, 6, 3], [11, 11, 1], [11, 4, 1]]
)
329
330
def test_pipeline_basics(num_chunks, num_parts, world_size):
    _test_pipeline(num_chunks, num_parts, world_size)
331
332
333
334
335
336


@pytest.mark.parametrize(
    "graph_formats", [None, "csc", "coo,csc", "coo,csc,csr"]
)
def test_pipeline_formats(graph_formats):
337
    _test_pipeline(4, 4, 4, graph_formats)
338

339
340

@pytest.mark.parametrize(
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
    "num_chunks, "
    "num_parts, "
    "world_size, "
    "num_chunks_node_data, "
    "num_chunks_edge_data",
    [
        [8, 4, 2, 20, 25],
        [9, 7, 5, 3, 11],
        [8, 8, 4, 3, 5],
        [8, 4, 2, {'paper': {'feat': 11, 'year': 1}},
            {('author', 'writes', 'paper'): {'year': 24}}],
    ]
)
def test_pipeline_arbitray_chunks(
    num_chunks,
    num_parts,
    world_size,
    num_chunks_node_data,
    num_chunks_edge_data,
):
    _test_pipeline(
        num_chunks,
        num_parts,
        world_size,
        num_chunks_node_data=num_chunks_node_data,
        num_chunks_edge_data=num_chunks_edge_data,
    )


@pytest.mark.parametrize(
    "graph_formats", [None, "csc", "coo,csc", "coo,csc,csr"]
)
def test_pipeline_formats(graph_formats):
    _test_pipeline(4, 4, 4, graph_formats)


@pytest.mark.parametrize(
    "data_fmt", ["numpy", "parquet"]
379
380
381
)
def test_pipeline_feature_format(data_fmt):
    _test_pipeline(4, 4, 4, data_fmt=data_fmt)
382
383
384
385
386
387
388
389


def test_utils_generate_read_list():
    read_list = generate_read_list(10, 4)
    assert np.array_equal(read_list[0], np.array([0, 1, 2]))
    assert np.array_equal(read_list[1], np.array([3, 4, 5]))
    assert np.array_equal(read_list[2], np.array([6, 7]))
    assert np.array_equal(read_list[3], np.array([8, 9]))