"tests/vscode:/vscode.git/clone" did not exist on "68c8481bcf63f6fa8481c631eb63fecc5173ee89"
test_dist_part.py 9.04 KB
Newer Older
1
2
3
import json
import os
import tempfile
4
5
6
7
8
import unittest

import numpy as np
import pytest
import torch
9
from chunk_graph import chunk_graph
10
from create_chunked_dataset import create_chunked_dataset
11
from distpartitioning import array_readwriter
12
13

import dgl
14
from dgl.data.utils import load_graphs, load_tensors
15
16
17
18
from dgl.distributed.partition import (RESERVED_FIELD_DTYPE,
                                       _etype_tuple_to_str,
                                       _get_inner_edge_mask,
                                       _get_inner_node_mask, load_partition)
19
20
21
22
23
24
25
26


def _verify_partition_data_types(part_g):
    for k, dtype in RESERVED_FIELD_DTYPE.items():
        if k in part_g.ndata:
            assert part_g.ndata[k].dtype == dtype
        if k in part_g.edata:
            assert part_g.edata[k].dtype == dtype
27

28
29
30
31
32
33
34
35
36
def _verify_partition_formats(part_g, formats):
    # Verify saved graph formats
    if formats is None:
        assert "coo" in part_g.formats()["created"]
    else:
        formats = formats.split(',')
        for format in formats:
            assert format in part_g.formats()["created"]

37

38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
def _verify_graph_feats(
    g, gpb, part, node_feats, edge_feats, orig_nids, orig_eids
):
    for ntype in g.ntypes:
        ntype_id = g.get_ntype_id(ntype)
        inner_node_mask = _get_inner_node_mask(part, ntype_id)
        inner_nids = part.ndata[dgl.NID][inner_node_mask]
        ntype_ids, inner_type_nids = gpb.map_to_per_ntype(inner_nids)
        partid = gpb.nid2partid(inner_type_nids, ntype)
        assert np.all(ntype_ids.numpy() == ntype_id)
        assert np.all(partid.numpy() == gpb.partid)

        orig_id = orig_nids[ntype][inner_type_nids]
        local_nids = gpb.nid2localnid(inner_type_nids, gpb.partid, ntype)

        for name in g.nodes[ntype].data:
            if name in [dgl.NID, "inner_node"]:
                continue
            true_feats = g.nodes[ntype].data[name][orig_id]
            ndata = node_feats[ntype + "/" + name][local_nids]
            assert torch.equal(ndata, true_feats)

60
    for etype in g.canonical_etypes:
61
62
63
64
65
66
67
68
        etype_id = g.get_etype_id(etype)
        inner_edge_mask = _get_inner_edge_mask(part, etype_id)
        inner_eids = part.edata[dgl.EID][inner_edge_mask]
        etype_ids, inner_type_eids = gpb.map_to_per_etype(inner_eids)
        partid = gpb.eid2partid(inner_type_eids, etype)
        assert np.all(etype_ids.numpy() == etype_id)
        assert np.all(partid.numpy() == gpb.partid)

69
        orig_id = orig_eids[_etype_tuple_to_str(etype)][inner_type_eids]
70
71
72
73
74
75
        local_eids = gpb.eid2localeid(inner_type_eids, gpb.partid, etype)

        for name in g.edges[etype].data:
            if name in [dgl.EID, "inner_edge"]:
                continue
            true_feats = g.edges[etype].data[name][orig_id]
76
            edata = edge_feats[_etype_tuple_to_str(etype) + "/" + name][local_eids]
77
            assert torch.equal(edata, true_feats)
78
79


80
@pytest.mark.parametrize("num_chunks", [1, 8])
81
82
@pytest.mark.parametrize("data_fmt", ['numpy', 'parquet'])
def test_chunk_graph(num_chunks, data_fmt):
83

84
    with tempfile.TemporaryDirectory() as root_dir:
85

86
        g = create_chunked_dataset(root_dir, num_chunks, data_fmt=data_fmt)
87
88

        # check metadata.json
89
90
        output_dir = os.path.join(root_dir, "chunked-data")
        json_file = os.path.join(output_dir, "metadata.json")
91
        assert os.path.isfile(json_file)
92
        with open(json_file, "rb") as f:
93
            meta_data = json.load(f)
94
95
        assert meta_data["graph_name"] == "mag240m"
        assert len(meta_data["num_nodes_per_chunk"][0]) == num_chunks
96
97

        # check edge_index
98
        output_edge_index_dir = os.path.join(output_dir, "edge_index")
99
100
        for c_etype in g.canonical_etypes:
            c_etype_str = _etype_tuple_to_str(c_etype)
101
            for i in range(num_chunks):
102
103
                fname = os.path.join(
                    output_edge_index_dir, f'{c_etype_str}{i}.txt'
104
                )
105
106
                assert os.path.isfile(fname)
                with open(fname, "r") as f:
107
                    header = f.readline()
108
                    num1, num2 = header.rstrip().split(" ")
109
110
111
                    assert isinstance(int(num1), int)
                    assert isinstance(int(num2), int)

112
        # check node/edge_data
113
114
        suffix = 'npy' if data_fmt=='numpy' else 'parquet'
        reader_fmt_meta = {"name": data_fmt}
115
116
        def test_data(sub_dir, feat, expected_data, expected_shape):
            data = []
117
            for i in range(num_chunks):
118
                fname = os.path.join(sub_dir, f'{feat}-{i}.{suffix}')
119
                assert os.path.isfile(fname)
120
121
122
                feat_array =  array_readwriter.get_array_parser(
                            **reader_fmt_meta
                        ).read(fname)
123
124
125
126
127
128
129
130
131
132
133
                assert feat_array.shape[0] == expected_shape
                data.append(feat_array)
            data = np.concatenate(data, 0)
            assert torch.equal(torch.from_numpy(data), expected_data)

        output_node_data_dir = os.path.join(output_dir, "node_data")
        for ntype in g.ntypes:
            sub_dir = os.path.join(output_node_data_dir, ntype)
            for feat, data in g.nodes[ntype].data.items():
                test_data(sub_dir, feat, data, g.num_nodes(ntype) // num_chunks)

134
        output_edge_data_dir = os.path.join(output_dir, "edge_data")
135
136
137
138
139
140
141
        for c_etype in g.canonical_etypes:
            c_etype_str = _etype_tuple_to_str(c_etype)
            sub_dir = os.path.join(output_edge_data_dir, c_etype_str)
            for feat, data in g.edges[c_etype].data.items():
                test_data(sub_dir, feat, data, g.num_edges(c_etype) // num_chunks)


142
def _test_pipeline(num_chunks, num_parts, world_size, graph_formats=None, data_fmt='numpy'):
143
144
145
146
    if num_chunks < num_parts:
        # num_parts should less/equal than num_chunks
        return

147
148
149
150
    if num_parts % world_size != 0:
        # num_parts should be a multiple of world_size
        return

151
152
    with tempfile.TemporaryDirectory() as root_dir:

153
        g = create_chunked_dataset(root_dir, num_chunks, data_fmt=data_fmt)
154
155

        # Step1: graph partition
156
157
        in_dir = os.path.join(root_dir, "chunked-data")
        output_dir = os.path.join(root_dir, "parted_data")
158
        os.system(
159
160
            "python3 tools/partition_algo/random_partition.py "
            "--in_dir {} --out_dir {} --num_partitions {}".format(
161
162
163
                in_dir, output_dir, num_parts
            )
        )
164
165
166
        for ntype in ["author", "institution", "paper"]:
            fname = os.path.join(output_dir, "{}.txt".format(ntype))
            with open(fname, "r") as f:
167
168
169
170
                header = f.readline().rstrip()
                assert isinstance(int(header), int)

        # Step2: data dispatch
171
172
173
174
175
176
        partition_dir = os.path.join(root_dir, 'parted_data')
        out_dir = os.path.join(root_dir, 'partitioned')
        ip_config = os.path.join(root_dir, 'ip_config.txt')
        with open(ip_config, 'w') as f:
            for i in range(world_size):
                f.write(f'127.0.0.{i + 1}\n')
177
178
179
180
181
182

        cmd = "python3 tools/dispatch_data.py"
        cmd += f" --in-dir {in_dir}"
        cmd += f" --partitions-dir {partition_dir}"
        cmd += f" --out-dir {out_dir}"
        cmd += f" --ip-config {ip_config}"
183
        cmd += " --ssh-port 22"
184
185
186
        cmd += " --process-group-timeout 60"
        cmd += " --save-orig-nids"
        cmd += " --save-orig-eids"
187
        cmd += f" --graph-formats {graph_formats}" if graph_formats else ""
188
        os.system(cmd)
189

190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
        # read original node/edge IDs
        def read_orig_ids(fname):
            orig_ids = {}
            for i in range(num_parts):
                ids_path = os.path.join(out_dir, f"part{i}", fname)
                part_ids = load_tensors(ids_path)
                for type, data in part_ids.items():
                    if type not in orig_ids:
                        orig_ids[type] = data
                    else:
                        orig_ids[type] = torch.cat((orig_ids[type], data))
            return orig_ids

        orig_nids = read_orig_ids("orig_nids.dgl")
        orig_eids = read_orig_ids("orig_eids.dgl")

        # load partitions and verify
        part_config = os.path.join(out_dir, "metadata.json")
208
        for i in range(num_parts):
209
210
211
            part_g, node_feats, edge_feats, gpb, _, _, _ = load_partition(
                part_config, i
            )
212
            _verify_partition_data_types(part_g)
213
            _verify_partition_formats(part_g, graph_formats)
214
215
216
            _verify_graph_feats(
                g, gpb, part_g, node_feats, edge_feats, orig_nids, orig_eids
            )
217
218


219
220
221
@pytest.mark.parametrize("num_chunks, num_parts, world_size", [[8, 4, 2], [9, 6, 3], [11, 11, 1], [11, 4, 2], [5, 3, 1]])
def test_pipeline_basics(num_chunks, num_parts, world_size):
    _test_pipeline(num_chunks, num_parts, world_size)
222
223
224
225
226
227


@pytest.mark.parametrize(
    "graph_formats", [None, "csc", "coo,csc", "coo,csc,csr"]
)
def test_pipeline_formats(graph_formats):
228
    _test_pipeline(4, 4, 4, graph_formats)
229

230
231
232
233
234
235

@pytest.mark.parametrize(
    "data_fmt", ['numpy', "parquet"]
)
def test_pipeline_feature_format(data_fmt):
    _test_pipeline(4, 4, 4, data_fmt=data_fmt)