test_parmetis.py 9.08 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
import argparse
import json
import os
import sys
import tempfile
import unittest

import dgl
import numpy as np
import torch
from dgl.data.utils import load_graphs, load_tensors
12
from partition_algo.base import load_partition_meta
13

14
from utils import create_chunked_dataset
15
16
17
18
19
20
21
22
23
24
25

"""
TODO: skipping this test case since the dependency, mpirun, is
not yet configured in the CI framework.
"""


@unittest.skipIf(True, reason="mpi is not available in CI test framework.")
def test_parmetis_preprocessing():
    with tempfile.TemporaryDirectory() as root_dir:
        num_chunks = 2
26
        g = create_chunked_dataset(root_dir, num_chunks)
27
28

        # Trigger ParMETIS pre-processing here.
29
30
        schema_path = os.path.join(root_dir, "chunked-data/metadata.json")
        results_dir = os.path.join(root_dir, "parmetis-data")
31
        os.system(
32
33
            f"mpirun -np 2 python3 tools/distpartitioning/parmetis_preprocess.py "
            f"--schema {schema_path} --output {results_dir}"
34
35
36
37
        )

        # Now add all the tests and check whether the test has passed or failed.
        # Read parmetis_nfiles and ensure all files are present.
38
        parmetis_data_dir = os.path.join(root_dir, "parmetis-data")
39
40
        assert os.path.isdir(parmetis_data_dir)
        parmetis_nodes_file = os.path.join(
41
            parmetis_data_dir, "parmetis_nfiles.txt"
42
43
44
45
46
        )
        assert os.path.isfile(parmetis_nodes_file)

        # `parmetis_nfiles.txt` should have each line in the following format.
        # <filename> <global_id_start> <global_id_end>
47
        with open(parmetis_nodes_file, "r") as nodes_metafile:
48
49
50
51
52
53
54
55
56
            lines = nodes_metafile.readlines()
            total_node_count = 0
            for line in lines:
                tokens = line.split(" ")
                assert len(tokens) == 3
                assert os.path.isfile(tokens[0])
                assert int(tokens[1]) == total_node_count

                # check contents of each of the nodes files here
57
                with open(tokens[0], "r") as nodes_file:
58
59
60
61
62
63
64
65
66
67
                    node_lines = nodes_file.readlines()
                    for line in node_lines:
                        val = line.split(" ")
                        # <ntype_id> <weight_list> <mask_list> <type_node_id>
                        assert len(val) == 8
                    node_count = len(node_lines)
                    total_node_count += node_count
                assert int(tokens[2]) == total_node_count

        # Meta_data object.
68
69
        output_dir = os.path.join(root_dir, "chunked-data")
        json_file = os.path.join(output_dir, "metadata.json")
70
        assert os.path.isfile(json_file)
71
        with open(json_file, "rb") as f:
72
73
74
75
            meta_data = json.load(f)

        # Count the total no. of nodes.
        true_node_count = 0
76
        num_nodes_per_chunk = meta_data["num_nodes_per_chunk"]
77
78
79
80
81
82
83
84
85
        for i in range(len(num_nodes_per_chunk)):
            node_per_part = num_nodes_per_chunk[i]
            for j in range(len(node_per_part)):
                true_node_count += node_per_part[j]
        assert total_node_count == true_node_count

        # Read parmetis_efiles and ensure all files are present.
        # This file contains a list of filenames.
        parmetis_edges_file = os.path.join(
86
            parmetis_data_dir, "parmetis_efiles.txt"
87
88
89
        )
        assert os.path.isfile(parmetis_edges_file)

90
        with open(parmetis_edges_file, "r") as edges_metafile:
91
92
93
94
95
96
            lines = edges_metafile.readlines()
            total_edge_count = 0
            for line in lines:
                edges_filename = line.strip()
                assert os.path.isfile(edges_filename)

97
                with open(edges_filename, "r") as edges_file:
98
99
100
101
102
103
104
105
                    edge_lines = edges_file.readlines()
                    total_edge_count += len(edge_lines)
                    for line in edge_lines:
                        val = line.split(" ")
                        assert len(val) == 2

        # Count the total no. of edges
        true_edge_count = 0
106
        num_edges_per_chunk = meta_data["num_edges_per_chunk"]
107
108
109
110
111
112
113
114
115
116
        for i in range(len(num_edges_per_chunk)):
            edges_per_part = num_edges_per_chunk[i]
            for j in range(len(edges_per_part)):
                true_edge_count += edges_per_part[j]
        assert true_edge_count == total_edge_count


def test_parmetis_postprocessing():
    with tempfile.TemporaryDirectory() as root_dir:
        num_chunks = 2
117
        g = create_chunked_dataset(root_dir, num_chunks)
118
119

        num_nodes = g.number_of_nodes()
120
121
122
        num_institutions = g.number_of_nodes("institution")
        num_authors = g.number_of_nodes("author")
        num_papers = g.number_of_nodes("paper")
123
124
125

        # Generate random parmetis partition ids for the nodes in the graph.
        # Replace this code with actual ParMETIS executable when it is ready
126
127
        output_dir = os.path.join(root_dir, "chunked-data")
        parmetis_file = os.path.join(output_dir, "parmetis_output.txt")
128
129
130
131
132
        node_ids = np.arange(num_nodes)
        partition_ids = np.random.randint(0, 2, (num_nodes,))
        parmetis_output = np.column_stack([node_ids, partition_ids])

        # Create parmetis output, this is mimicking running actual parmetis.
133
        with open(parmetis_file, "w") as f:
134
135
136
            np.savetxt(f, parmetis_output)

        # Check the post processing script here.
137
138
        results_dir = os.path.join(output_dir, "partitions_dir")
        json_file = os.path.join(output_dir, "metadata.json")
139
140
141
142
        print(json_file)
        print(results_dir)
        print(parmetis_file)
        os.system(
143
144
145
146
            f"python3 tools/distpartitioning/parmetis_postprocess.py "
            f"--schema_file {json_file} "
            f"--parmetis_output_file {parmetis_file} "
            f"--partitions_dir {results_dir}"
147
148
149
        )

        ntype_count = {
150
151
152
            "author": num_authors,
            "paper": num_papers,
            "institution": num_institutions,
153
        }
154
155
        for ntype_name in ["author", "paper", "institution"]:
            fname = os.path.join(results_dir, f"{ntype_name}.txt")
156
157
158
159
160
161
162
163
164
            print(fname)
            assert os.path.isfile(fname)

            # Load and check the partition ids in this file.
            part_ids = np.loadtxt(fname)
            assert part_ids.shape[0] == ntype_count[ntype_name]
            assert np.min(part_ids) == 0
            assert np.max(part_ids) == 1

165
166
167
168
169
170
171
        # check partition meta file
        part_meta_file = os.path.join(results_dir, "partition_meta.json")
        assert os.path.isfile(part_meta_file)
        part_meta = load_partition_meta(part_meta_file)
        assert part_meta.num_parts == 2
        assert part_meta.algo_name == "metis"

172
173
174
175
176
177
178
179
180
181
182
183

"""
TODO: skipping this test case since it depends on the dependency, mpi,
which is not yet configured in the CI framework.
"""


@unittest.skipIf(True, reason="mpi is not available in CI test framework.")
def test_parmetis_wrapper():
    with tempfile.TemporaryDirectory() as root_dir:
        num_chunks = 2
        graph_name = "mag240m"
184
        g = create_chunked_dataset(root_dir, num_chunks)
185
186
187
        all_ntypes = g.ntypes
        all_etypes = g.etypes
        num_constraints = len(all_ntypes) + 3
188
189
190
        num_institutions = g.number_of_nodes("institution")
        num_authors = g.number_of_nodes("author")
        num_papers = g.number_of_nodes("paper")
191
192

        # Trigger ParMETIS.
193
        schema_file = os.path.join(root_dir, "chunked-data/metadata.json")
194
        preproc_output_dir = os.path.join(
195
            root_dir, "chunked-data/preproc_output_dir"
196
197
        )
        parmetis_output_file = os.path.join(
198
            os.getcwd(), f"{graph_name}_part.{num_chunks}"
199
        )
200
201
202
203
204
        partitions_dir = os.path.join(root_dir, "chunked-data/partitions_dir")
        hostfile = os.path.join(root_dir, "ip_config.txt")
        with open(hostfile, "w") as f:
            f.write("127.0.0.1\n")
            f.write("127.0.0.1\n")
205
206
207

        num_nodes = g.number_of_nodes()
        num_edges = g.number_of_edges()
208
209
210
        stats_file = f"{graph_name}_stats.txt"
        with open(stats_file, "w") as f:
            f.write(f"{num_nodes} {num_edges} {num_constraints}")
211
212

        parmetis_cmd = (
213
214
215
216
217
218
            f"python3 tools/distpartitioning/parmetis_wrapper.py "
            f"--schema_file {schema_file} "
            f"--preproc_output_dir {preproc_output_dir} "
            f"--hostfile {hostfile} "
            f"--parmetis_output_file {parmetis_output_file} "
            f"--partitions_dir {partitions_dir} "
219
        )
220
        print(f"Executing the following cmd: {parmetis_cmd}")
221
222
223
224
        print(parmetis_cmd)
        os.system(parmetis_cmd)

        ntype_count = {
225
226
227
            "author": num_authors,
            "paper": num_papers,
            "institution": num_institutions,
228
        }
229
230
        for ntype_name in ["author", "paper", "institution"]:
            fname = os.path.join(partitions_dir, f"{ntype_name}.txt")
231
232
233
234
235
236
237
238
            print(fname)
            assert os.path.isfile(fname)

            # Load and check the partition ids in this file.
            part_ids = np.loadtxt(fname)
            assert part_ids.shape[0] == ntype_count[ntype_name]
            assert np.min(part_ids) == 0
            assert np.max(part_ids) == (num_chunks - 1)