test_parmetis.py 9.33 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
import argparse
import json
import os
import sys
import tempfile
import unittest

import dgl
import numpy as np
import torch
from dgl.data.utils import load_graphs, load_tensors
12
from partition_algo.base import load_partition_meta
13

14
from pytest_utils import create_chunked_dataset
15
16
17
18
19
20
21
22
23
24
25

"""
TODO: skipping this test case since the dependency, mpirun, is
not yet configured in the CI framework.
"""


@unittest.skipIf(True, reason="mpi is not available in CI test framework.")
def test_parmetis_preprocessing():
    with tempfile.TemporaryDirectory() as root_dir:
        num_chunks = 2
26
        g = create_chunked_dataset(root_dir, num_chunks)
27
28

        # Trigger ParMETIS pre-processing here.
29
        input_dir = os.path.join(root_dir, "chunked-data")
30
        results_dir = os.path.join(root_dir, "parmetis-data")
31
        os.system(
32
33
34
35
36
            f"mpirun -np {num_chunks} python3 tools/distpartitioning/parmetis_preprocess.py "
            f"--schema {metadata.json} "
            f"--input_dir {input_dir} "
            f"--output_dir {results_dir} "
            f"--num_parts {num_chunks}"
37
38
39
40
        )

        # Now add all the tests and check whether the test has passed or failed.
        # Read parmetis_nfiles and ensure all files are present.
41
        parmetis_data_dir = os.path.join(root_dir, "parmetis-data")
42
43
        assert os.path.isdir(parmetis_data_dir)
        parmetis_nodes_file = os.path.join(
44
            parmetis_data_dir, "parmetis_nfiles.txt"
45
46
47
48
49
        )
        assert os.path.isfile(parmetis_nodes_file)

        # `parmetis_nfiles.txt` should have each line in the following format.
        # <filename> <global_id_start> <global_id_end>
50
        with open(parmetis_nodes_file, "r") as nodes_metafile:
51
52
53
54
55
56
57
58
59
            lines = nodes_metafile.readlines()
            total_node_count = 0
            for line in lines:
                tokens = line.split(" ")
                assert len(tokens) == 3
                assert os.path.isfile(tokens[0])
                assert int(tokens[1]) == total_node_count

                # check contents of each of the nodes files here
60
                with open(tokens[0], "r") as nodes_file:
61
62
63
64
65
66
67
68
69
70
                    node_lines = nodes_file.readlines()
                    for line in node_lines:
                        val = line.split(" ")
                        # <ntype_id> <weight_list> <mask_list> <type_node_id>
                        assert len(val) == 8
                    node_count = len(node_lines)
                    total_node_count += node_count
                assert int(tokens[2]) == total_node_count

        # Meta_data object.
71
72
        output_dir = os.path.join(root_dir, "chunked-data")
        json_file = os.path.join(output_dir, "metadata.json")
73
        assert os.path.isfile(json_file)
74
        with open(json_file, "rb") as f:
75
76
77
78
            meta_data = json.load(f)

        # Count the total no. of nodes.
        true_node_count = 0
79
        num_nodes_per_chunk = meta_data["num_nodes_per_chunk"]
80
81
82
83
84
85
86
87
88
        for i in range(len(num_nodes_per_chunk)):
            node_per_part = num_nodes_per_chunk[i]
            for j in range(len(node_per_part)):
                true_node_count += node_per_part[j]
        assert total_node_count == true_node_count

        # Read parmetis_efiles and ensure all files are present.
        # This file contains a list of filenames.
        parmetis_edges_file = os.path.join(
89
            parmetis_data_dir, "parmetis_efiles.txt"
90
91
92
        )
        assert os.path.isfile(parmetis_edges_file)

93
        with open(parmetis_edges_file, "r") as edges_metafile:
94
95
96
97
98
99
            lines = edges_metafile.readlines()
            total_edge_count = 0
            for line in lines:
                edges_filename = line.strip()
                assert os.path.isfile(edges_filename)

100
                with open(edges_filename, "r") as edges_file:
101
102
103
104
105
106
107
108
                    edge_lines = edges_file.readlines()
                    total_edge_count += len(edge_lines)
                    for line in edge_lines:
                        val = line.split(" ")
                        assert len(val) == 2

        # Count the total no. of edges
        true_edge_count = 0
109
        num_edges_per_chunk = meta_data["num_edges_per_chunk"]
110
111
112
113
114
115
116
117
118
119
        for i in range(len(num_edges_per_chunk)):
            edges_per_part = num_edges_per_chunk[i]
            for j in range(len(edges_per_part)):
                true_edge_count += edges_per_part[j]
        assert true_edge_count == total_edge_count


def test_parmetis_postprocessing():
    with tempfile.TemporaryDirectory() as root_dir:
        num_chunks = 2
120
        g = create_chunked_dataset(root_dir, num_chunks)
121

Hongzhi (Steve), Chen's avatar
Hongzhi (Steve), Chen committed
122
123
124
125
        num_nodes = g.num_nodes()
        num_institutions = g.num_nodes("institution")
        num_authors = g.num_nodes("author")
        num_papers = g.num_nodes("paper")
126
127
128

        # Generate random parmetis partition ids for the nodes in the graph.
        # Replace this code with actual ParMETIS executable when it is ready
129
        output_dir = os.path.join(root_dir, "chunked-data")
130
131
        assert os.path.isdir(output_dir)

132
        parmetis_file = os.path.join(output_dir, "parmetis_output.txt")
133
134
135
136
137
        node_ids = np.arange(num_nodes)
        partition_ids = np.random.randint(0, 2, (num_nodes,))
        parmetis_output = np.column_stack([node_ids, partition_ids])

        # Create parmetis output, this is mimicking running actual parmetis.
138
        with open(parmetis_file, "w") as f:
139
            np.savetxt(f, parmetis_output)
140
        assert os.path.isfile(parmetis_file)
141
142

        # Check the post processing script here.
143
144
        results_dir = os.path.join(output_dir, "partitions_dir")
        json_file = os.path.join(output_dir, "metadata.json")
145
146
147
148
        print(json_file)
        print(results_dir)
        print(parmetis_file)
        os.system(
149
            f"python3 tools/distpartitioning/parmetis_postprocess.py "
150
151
            f"--postproc_input_dir {output_dir} "
            f"--schema_file metadata.json "
152
153
            f"--parmetis_output_file {parmetis_file} "
            f"--partitions_dir {results_dir}"
154
155
156
        )

        ntype_count = {
157
158
159
            "author": num_authors,
            "paper": num_papers,
            "institution": num_institutions,
160
        }
161
162
        for ntype_name in ["author", "paper", "institution"]:
            fname = os.path.join(results_dir, f"{ntype_name}.txt")
163
164
165
166
167
168
169
170
171
            print(fname)
            assert os.path.isfile(fname)

            # Load and check the partition ids in this file.
            part_ids = np.loadtxt(fname)
            assert part_ids.shape[0] == ntype_count[ntype_name]
            assert np.min(part_ids) == 0
            assert np.max(part_ids) == 1

172
173
174
175
176
177
178
        # check partition meta file
        part_meta_file = os.path.join(results_dir, "partition_meta.json")
        assert os.path.isfile(part_meta_file)
        part_meta = load_partition_meta(part_meta_file)
        assert part_meta.num_parts == 2
        assert part_meta.algo_name == "metis"

179
180
181
182
183
184
185
186
187
188
189
190

"""
TODO: skipping this test case since it depends on the dependency, mpi,
which is not yet configured in the CI framework.
"""


@unittest.skipIf(True, reason="mpi is not available in CI test framework.")
def test_parmetis_wrapper():
    with tempfile.TemporaryDirectory() as root_dir:
        num_chunks = 2
        graph_name = "mag240m"
191
        g = create_chunked_dataset(root_dir, num_chunks)
192
193
194
        all_ntypes = g.ntypes
        all_etypes = g.etypes
        num_constraints = len(all_ntypes) + 3
Hongzhi (Steve), Chen's avatar
Hongzhi (Steve), Chen committed
195
196
197
        num_institutions = g.num_nodes("institution")
        num_authors = g.num_nodes("author")
        num_papers = g.num_nodes("paper")
198
199

        # Trigger ParMETIS.
200
        schema_file = os.path.join(root_dir, "chunked-data/metadata.json")
201
        preproc_input_dir = os.path.join(root_dir, "chunked-data")
202
        preproc_output_dir = os.path.join(
203
            root_dir, "chunked-data/preproc_output_dir"
204
205
        )
        parmetis_output_file = os.path.join(
206
            os.getcwd(), f"{graph_name}_part.{num_chunks}"
207
        )
208
209
210
211
212
        partitions_dir = os.path.join(root_dir, "chunked-data/partitions_dir")
        hostfile = os.path.join(root_dir, "ip_config.txt")
        with open(hostfile, "w") as f:
            f.write("127.0.0.1\n")
            f.write("127.0.0.1\n")
213

Hongzhi (Steve), Chen's avatar
Hongzhi (Steve), Chen committed
214
215
        num_nodes = g.num_nodes()
        num_edges = g.num_edges()
216
217
218
        stats_file = f"{graph_name}_stats.txt"
        with open(stats_file, "w") as f:
            f.write(f"{num_nodes} {num_edges} {num_constraints}")
219

220
        os.system(
221
222
            f"python3 tools/distpartitioning/parmetis_wrapper.py "
            f"--schema_file {schema_file} "
223
            f"--preproc_input_dir {preproc_input_dir} "
224
225
            f"--preproc_output_dir {preproc_output_dir} "
            f"--hostfile {hostfile} "
226
            f"--num_parts {num_chunks} "
227
228
            f"--parmetis_output_file {parmetis_output_file} "
            f"--partitions_dir {partitions_dir} "
229
        )
230
        print("Executing Done.")
231
232

        ntype_count = {
233
234
235
            "author": num_authors,
            "paper": num_papers,
            "institution": num_institutions,
236
        }
237
238
        for ntype_name in ["author", "paper", "institution"]:
            fname = os.path.join(partitions_dir, f"{ntype_name}.txt")
239
240
241
242
243
244
245
246
            print(fname)
            assert os.path.isfile(fname)

            # Load and check the partition ids in this file.
            part_ids = np.loadtxt(fname)
            assert part_ids.shape[0] == ntype_count[ntype_name]
            assert np.min(part_ids) == 0
            assert np.max(part_ids) == (num_chunks - 1)