test_dist_part.py 10.9 KB
Newer Older
1
2
3
import json
import os
import tempfile
4
5
6
7
8
import unittest

import numpy as np
import pytest
import torch
9
from chunk_graph import chunk_graph
10
11
12
from create_chunked_dataset import create_chunked_dataset

import dgl
13
from dgl.data.utils import load_graphs, load_tensors
14
15


16
17
@pytest.mark.parametrize("num_chunks", [1, 8])
def test_chunk_graph(num_chunks):
18

19
    with tempfile.TemporaryDirectory() as root_dir:
20

21
        g = create_chunked_dataset(root_dir, num_chunks, include_edge_data=True)
22

23
24
25
        num_cite_edges = g.number_of_edges("cites")
        num_write_edges = g.number_of_edges("writes")
        num_affiliate_edges = g.number_of_edges("affiliated_with")
26

27
28
29
        num_institutions = g.number_of_nodes("institution")
        num_authors = g.number_of_nodes("author")
        num_papers = g.number_of_nodes("paper")
30
31

        # check metadata.json
32
33
        output_dir = os.path.join(root_dir, "chunked-data")
        json_file = os.path.join(output_dir, "metadata.json")
34
        assert os.path.isfile(json_file)
35
        with open(json_file, "rb") as f:
36
            meta_data = json.load(f)
37
38
        assert meta_data["graph_name"] == "mag240m"
        assert len(meta_data["num_nodes_per_chunk"][0]) == num_chunks
39
40

        # check edge_index
41
        output_edge_index_dir = os.path.join(output_dir, "edge_index")
42
        for utype, etype, vtype in g.canonical_etypes:
43
            fname = ":".join([utype, etype, vtype])
44
            for i in range(num_chunks):
45
                chunk_f_name = os.path.join(
46
                    output_edge_index_dir, fname + str(i) + ".txt"
47
                )
48
                assert os.path.isfile(chunk_f_name)
49
                with open(chunk_f_name, "r") as f:
50
                    header = f.readline()
51
                    num1, num2 = header.rstrip().split(" ")
52
53
54
55
                    assert isinstance(int(num1), int)
                    assert isinstance(int(num2), int)

        # check node_data
56
57
        output_node_data_dir = os.path.join(output_dir, "node_data", "paper")
        for feat in ["feat", "label", "year"]:
58
            for i in range(num_chunks):
59
                chunk_f_name = "{}-{}.npy".format(feat, i)
60
61
62
63
64
65
66
                chunk_f_name = os.path.join(output_node_data_dir, chunk_f_name)
                assert os.path.isfile(chunk_f_name)
                feat_array = np.load(chunk_f_name)
                assert feat_array.shape[0] == num_papers // num_chunks

        # check edge_data
        num_edges = {
67
68
69
            "paper:cites:paper": num_cite_edges,
            "author:writes:paper": num_write_edges,
            "paper:rev_writes:author": num_write_edges,
70
        }
71
        output_edge_data_dir = os.path.join(output_dir, "edge_data")
72
        for etype, feat in [
73
74
75
            ["paper:cites:paper", "count"],
            ["author:writes:paper", "year"],
            ["paper:rev_writes:author", "year"],
76
77
78
        ]:
            output_edge_sub_dir = os.path.join(output_edge_data_dir, etype)
            for i in range(num_chunks):
79
                chunk_f_name = "{}-{}.npy".format(feat, i)
80
81
82
                chunk_f_name = os.path.join(output_edge_sub_dir, chunk_f_name)
                assert os.path.isfile(chunk_f_name)
                feat_array = np.load(chunk_f_name)
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
            assert feat_array.shape[0] == num_edges[etype] // num_chunks


@pytest.mark.parametrize("num_chunks", [1, 2, 3, 4, 8])
@pytest.mark.parametrize("num_parts", [1, 2, 3, 4, 8])
def test_part_pipeline(num_chunks, num_parts):
    if num_chunks < num_parts:
        # num_parts should less/equal than num_chunks
        return

    include_edge_data = num_chunks == num_parts

    with tempfile.TemporaryDirectory() as root_dir:

        g = create_chunked_dataset(
            root_dir, num_chunks, include_edge_data=include_edge_data
        )

        all_ntypes = g.ntypes
        all_etypes = g.etypes

104
105
106
        num_cite_edges = g.number_of_edges("cites")
        num_write_edges = g.number_of_edges("writes")
        num_affiliate_edges = g.number_of_edges("affiliated_with")
107

108
109
110
        num_institutions = g.number_of_nodes("institution")
        num_authors = g.number_of_nodes("author")
        num_papers = g.number_of_nodes("paper")
111
112

        # Step1: graph partition
113
114
        in_dir = os.path.join(root_dir, "chunked-data")
        output_dir = os.path.join(root_dir, "parted_data")
115
        os.system(
116
117
            "python3 tools/partition_algo/random_partition.py "
            "--in_dir {} --out_dir {} --num_partitions {}".format(
118
119
120
                in_dir, output_dir, num_parts
            )
        )
121
122
123
        for ntype in ["author", "institution", "paper"]:
            fname = os.path.join(output_dir, "{}.txt".format(ntype))
            with open(fname, "r") as f:
124
125
126
127
                header = f.readline().rstrip()
                assert isinstance(int(header), int)

        # Step2: data dispatch
128
129
130
131
        partition_dir = os.path.join(root_dir, "parted_data")
        out_dir = os.path.join(root_dir, "partitioned")
        ip_config = os.path.join(root_dir, "ip_config.txt")
        with open(ip_config, "w") as f:
132
            for i in range(num_parts):
133
134
135
136
137
138
139
140
141
142
                f.write(f"127.0.0.{i + 1}\n")

        cmd = "python3 tools/dispatch_data.py"
        cmd += f" --in-dir {in_dir}"
        cmd += f" --partitions-dir {partition_dir}"
        cmd += f" --out-dir {out_dir}"
        cmd += f" --ip-config {ip_config}"
        cmd += " --process-group-timeout 60"
        cmd += " --save-orig-nids"
        cmd += " --save-orig-eids"
143
        os.system(cmd)
144
145

        # check metadata.json
146
147
        meta_fname = os.path.join(out_dir, "metadata.json")
        with open(meta_fname, "rb") as f:
148
149
150
            meta_data = json.load(f)

        for etype in all_etypes:
151
152
153
            assert len(meta_data["edge_map"][etype]) == num_parts
        assert meta_data["etypes"].keys() == set(all_etypes)
        assert meta_data["graph_name"] == "mag240m"
154
155

        for ntype in all_ntypes:
156
157
158
159
160
            assert len(meta_data["node_map"][ntype]) == num_parts
        assert meta_data["ntypes"].keys() == set(all_ntypes)
        assert meta_data["num_edges"] == g.num_edges()
        assert meta_data["num_nodes"] == g.num_nodes()
        assert meta_data["num_parts"] == num_parts
161

162
163
164
165
166
167
168
        edge_dict = {}
        edge_data_gold = {}

        if include_edge_data:
            # Create Id Map here.
            num_edges = 0
            for utype, etype, vtype in g.canonical_etypes:
169
                fname = ":".join([utype, etype, vtype])
170
171
172
173
174
175
176
177
178
179
180
                edge_dict[fname] = np.array(
                    [num_edges, num_edges + g.number_of_edges(etype)]
                ).reshape(1, 2)
                num_edges += g.number_of_edges(etype)

            assert num_edges == g.number_of_edges()
            id_map = dgl.distributed.id_map.IdMap(edge_dict)
            orig_etype_id, orig_type_eid = id_map(np.arange(num_edges))

            # check edge_data
            num_edges = {
181
182
183
                "paper:cites:paper": num_cite_edges,
                "author:writes:paper": num_write_edges,
                "paper:rev_writes:author": num_write_edges,
184
            }
185
186
            output_dir = os.path.join(root_dir, "chunked-data")
            output_edge_data_dir = os.path.join(output_dir, "edge_data")
187
            for etype, feat in [
188
189
190
                ["paper:cites:paper", "count"],
                ["author:writes:paper", "year"],
                ["paper:rev_writes:author", "year"],
191
192
193
194
            ]:
                output_edge_sub_dir = os.path.join(output_edge_data_dir, etype)
                features = []
                for i in range(num_chunks):
195
                    chunk_f_name = "{}-{}.npy".format(feat, i)
196
197
198
199
200
201
202
                    chunk_f_name = os.path.join(
                        output_edge_sub_dir, chunk_f_name
                    )
                    assert os.path.isfile(chunk_f_name)
                    feat_array = np.load(chunk_f_name)
                    assert feat_array.shape[0] == num_edges[etype] // num_chunks
                features.append(feat_array)
203
                edge_data_gold[etype + "/" + feat] = np.concatenate(features)
204

205
        for i in range(num_parts):
206
            sub_dir = "part-" + str(i)
207
            assert meta_data[sub_dir][
208
209
                "node_feats"
            ] == "part{}/node_feat.dgl".format(i)
210
            assert meta_data[sub_dir][
211
212
                "edge_feats"
            ] == "part{}/edge_feat.dgl".format(i)
213
            assert meta_data[sub_dir][
214
215
                "part_graph"
            ] == "part{}/graph.dgl".format(i)
216
217

            # check data
218
            sub_dir = os.path.join(out_dir, "part" + str(i))
219
220

            # graph.dgl
221
            fname = os.path.join(sub_dir, "graph.dgl")
222
223
            assert os.path.isfile(fname)
            g_list, data_dict = load_graphs(fname)
224
225
            part_g = g_list[0]
            assert isinstance(part_g, dgl.DGLGraph)
226
227

            # node_feat.dgl
228
            fname = os.path.join(sub_dir, "node_feat.dgl")
229
230
            assert os.path.isfile(fname)
            tensor_dict = load_tensors(fname)
231
            all_tensors = [
232
233
234
235
                "paper/feat",
                "paper/label",
                "paper/year",
                "paper/orig_ids",
236
            ]
237
238
239
            assert tensor_dict.keys() == set(all_tensors)
            for key in all_tensors:
                assert isinstance(tensor_dict[key], torch.Tensor)
240
            ndata_paper_orig_ids = tensor_dict["paper/orig_ids"]
241

242
            # orig_nids.dgl
243
            fname = os.path.join(sub_dir, "orig_nids.dgl")
244
245
246
            assert os.path.isfile(fname)
            orig_nids = load_tensors(fname)
            assert len(orig_nids.keys()) == 3
247
            assert torch.equal(ndata_paper_orig_ids, orig_nids["paper"])
248
249

            # orig_eids.dgl
250
            fname = os.path.join(sub_dir, "orig_eids.dgl")
251
252
253
254
            assert os.path.isfile(fname)
            orig_eids = load_tensors(fname)
            assert len(orig_eids.keys()) == 4

255
256
257
            if include_edge_data:

                # Read edge_feat.dgl
258
                fname = os.path.join(sub_dir, "edge_feat.dgl")
259
260
261
                assert os.path.isfile(fname)
                tensor_dict = load_tensors(fname)
                all_tensors = [
262
263
264
                    "paper:cites:paper/count",
                    "author:writes:paper/year",
                    "paper:rev_writes:author/year",
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
                ]
                assert tensor_dict.keys() == set(all_tensors)
                for key in all_tensors:
                    assert isinstance(tensor_dict[key], torch.Tensor)

                # Compare the data stored as edge features in this partition with the data
                # from the original graph.
                for idx, etype in enumerate(all_etypes):
                    if etype != key:
                        continue

                    # key in canonical form
                    tokens = key.split(":")
                    assert len(tokens) == 3

                    gold_type_ids = orig_type_eid[orig_etype_id == idx]
                    gold_data = edge_data_gold[key][gold_type_ids]
                    assert np.all(gold_data == part_data.numpy())