test_utils.py 10 KB
Newer Older
1
import json
2
import os
3
import re
4
5
import tempfile

6
import dgl.graphbolt.internal as internal
7
import numpy as np
8
import pandas as pd
9
10
11
12
13
14
15
16
17
import pytest
import torch


def test_read_torch_data():
    with tempfile.TemporaryDirectory() as test_dir:
        save_tensor = torch.tensor([[1, 2, 4], [2, 5, 3]])
        file_name = os.path.join(test_dir, "save_tensor.pt")
        torch.save(save_tensor, file_name)
18
        read_tensor = internal.utils._read_torch_data(file_name)
19
20
21
22
23
24
25
26
27
28
        assert torch.equal(save_tensor, read_tensor)
        save_tensor = read_tensor = None


@pytest.mark.parametrize("in_memory", [True, False])
def test_read_numpy_data(in_memory):
    with tempfile.TemporaryDirectory() as test_dir:
        save_numpy = np.array([[1, 2, 4], [2, 5, 3]])
        file_name = os.path.join(test_dir, "save_numpy.npy")
        np.save(file_name, save_numpy)
29
        read_tensor = internal.utils._read_numpy_data(file_name, in_memory)
30
31
32
33
34
35
36
37
38
39
40
41
42
43
        assert torch.equal(torch.from_numpy(save_numpy), read_tensor)
        save_numpy = read_tensor = None


@pytest.mark.parametrize("fmt", ["torch", "numpy"])
def test_read_data(fmt):
    with tempfile.TemporaryDirectory() as test_dir:
        data = np.array([[1, 2, 4], [2, 5, 3]])
        type_name = "pt" if fmt == "torch" else "npy"
        file_name = os.path.join(test_dir, f"save_data.{type_name}")
        if fmt == "numpy":
            np.save(file_name, data)
        elif fmt == "torch":
            torch.save(torch.from_numpy(data), file_name)
44
        read_tensor = internal.read_data(file_name, fmt)
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
        assert torch.equal(torch.from_numpy(data), read_tensor)


@pytest.mark.parametrize(
    "data_fmt, save_fmt, contiguous",
    [
        ("torch", "torch", True),
        ("torch", "torch", False),
        ("torch", "numpy", True),
        ("torch", "numpy", False),
        ("numpy", "torch", True),
        ("numpy", "torch", False),
        ("numpy", "numpy", True),
        ("numpy", "numpy", False),
    ],
)
def test_save_data(data_fmt, save_fmt, contiguous):
    with tempfile.TemporaryDirectory() as test_dir:
        data = np.array([[1, 2, 4], [2, 5, 3]])
        if not contiguous:
            data = np.asfortranarray(data)
        tensor_data = torch.from_numpy(data)
        type_name = "pt" if save_fmt == "torch" else "npy"
        save_file_name = os.path.join(test_dir, f"save_data.{type_name}")
        # Step1. Save the data.
        if data_fmt == "torch":
71
            internal.save_data(tensor_data, save_file_name, save_fmt)
72
        elif data_fmt == "numpy":
73
            internal.save_data(data, save_file_name, save_fmt)
74
75
76
77
78
79
80
81
82
83
84
85
86

        # Step2. Load the data.
        if save_fmt == "torch":
            loaded_data = torch.load(save_file_name)
            assert loaded_data.is_contiguous()
            assert torch.equal(tensor_data, loaded_data)
        elif save_fmt == "numpy":
            loaded_data = np.load(save_file_name)
            # Checks if the loaded data is C-contiguous.
            assert loaded_data.flags["C_CONTIGUOUS"]
            assert np.array_equal(tensor_data.numpy(), loaded_data)

        data = tensor_data = loaded_data = None
87
88
89
90
91
92
93
94
95
96


@pytest.mark.parametrize("fmt", ["torch", "numpy"])
def test_get_npy_dim(fmt):
    with tempfile.TemporaryDirectory() as test_dir:
        data = np.array([[1, 2, 4], [2, 5, 3]])
        type_name = "pt" if fmt == "torch" else "npy"
        file_name = os.path.join(test_dir, f"save_data.{type_name}")
        if fmt == "numpy":
            np.save(file_name, data)
97
            assert internal.get_npy_dim(file_name) == 2
98
99
100
        elif fmt == "torch":
            torch.save(torch.from_numpy(data), file_name)
            with pytest.raises(ValueError):
101
                internal.get_npy_dim(file_name)
102
        data = None
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121


@pytest.mark.parametrize("data_fmt", ["numpy", "torch"])
@pytest.mark.parametrize("save_fmt", ["numpy", "torch"])
@pytest.mark.parametrize("is_feature", [True, False])
def test_copy_or_convert_data(data_fmt, save_fmt, is_feature):
    with tempfile.TemporaryDirectory() as test_dir:
        data = np.arange(10)
        tensor_data = torch.from_numpy(data)
        in_type_name = "npy" if data_fmt == "numpy" else "pt"
        input_path = os.path.join(test_dir, f"data.{in_type_name}")
        out_type_name = "npy" if save_fmt == "numpy" else "pt"
        output_path = os.path.join(test_dir, f"out_data.{out_type_name}")
        if data_fmt == "numpy":
            np.save(input_path, data)
        else:
            torch.save(tensor_data, input_path)
        if save_fmt == "torch":
            with pytest.raises(AssertionError):
122
                internal.copy_or_convert_data(
123
124
125
126
127
128
129
                    input_path,
                    output_path,
                    data_fmt,
                    save_fmt,
                    is_feature=is_feature,
                )
        else:
130
            internal.copy_or_convert_data(
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
                input_path,
                output_path,
                data_fmt,
                save_fmt,
                is_feature=is_feature,
            )
        if is_feature:
            data = data.reshape(-1, 1)
            tensor_data = tensor_data.reshape(-1, 1)
        if save_fmt == "numpy":
            out_data = np.load(output_path)
            assert (data == out_data).all()

        data = None
        tensor_data = None
        out_data = None
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203


@pytest.mark.parametrize("edge_fmt", ["csv", "numpy"])
def test_read_edges(edge_fmt):
    with tempfile.TemporaryDirectory() as test_dir:
        num_nodes = 40
        num_edges = 200
        nodes = np.repeat(np.arange(num_nodes), 5)
        neighbors = np.random.randint(0, num_nodes, size=(num_edges))
        edges = np.stack([nodes, neighbors], axis=1)
        os.makedirs(os.path.join(test_dir, "edges"), exist_ok=True)
        if edge_fmt == "csv":
            # Wrtie into edges/edge.csv
            edges = pd.DataFrame(edges, columns=["src", "dst"])
            edge_path = os.path.join("edges", "edge.csv")
            edges.to_csv(
                os.path.join(test_dir, edge_path),
                index=False,
                header=False,
            )
        else:
            # Wrtie into edges/edge.npy
            edges = edges.T
            edge_path = os.path.join("edges", "edge.npy")
            np.save(os.path.join(test_dir, edge_path), edges)
        src, dst = internal.read_edges(test_dir, edge_fmt, edge_path)
        assert src.all() == nodes.all()
        assert dst.all() == neighbors.all()


def test_read_edges_error():
    # 1. Unsupported file format.
    with pytest.raises(
        AssertionError,
        match="`numpy` or `csv` is expected when reading edges but got `fake-type`.",
    ):
        internal.read_edges("test_dir", "fake-type", "edge_path")

    # 2. Unexpected shape of numpy array
    with tempfile.TemporaryDirectory() as test_dir:
        num_nodes = 40
        num_edges = 200
        nodes = np.repeat(np.arange(num_nodes), 5)
        neighbors = np.random.randint(0, num_nodes, size=(num_edges))
        edges = np.stack([nodes, neighbors, nodes], axis=1)
        os.makedirs(os.path.join(test_dir, "edges"), exist_ok=True)
        # Wrtie into edges/edge.npy
        edges = edges.T
        edge_path = os.path.join("edges", "edge.npy")
        np.save(os.path.join(test_dir, edge_path), edges)
        with pytest.raises(
            AssertionError,
            match=re.escape(
                "The shape of edges should be (2, N), but got torch.Size([3, 200])."
            ),
        ):
            internal.read_edges(test_dir, "numpy", edge_path)
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268


def test_calculate_file_hash():
    with tempfile.TemporaryDirectory() as test_dir:
        test_file_path = os.path.join(test_dir, "test.txt")
        with open(test_file_path, "w") as file:
            file.write("test content")
        hash_value = internal.calculate_file_hash(
            test_file_path, hash_algo="md5"
        )
        expected_hash_value = "9473fdd0d880a43c21b7778d34872157"
        assert expected_hash_value == hash_value
        with pytest.raises(
            ValueError,
            match=re.escape(
                "Hash algorithm must be one of: ['md5', 'sha1', 'sha224', "
                + "'sha256', 'sha384', 'sha512'], but got `fake`."
            ),
        ):
            hash_value = internal.calculate_file_hash(
                test_file_path, hash_algo="fake"
            )


def test_calculate_dir_hash():
    with tempfile.TemporaryDirectory() as test_dir:
        test_file_path_1 = os.path.join(test_dir, "test_1.txt")
        test_file_path_2 = os.path.join(test_dir, "test_2.txt")
        with open(test_file_path_1, "w") as file:
            file.write("test content")
        with open(test_file_path_2, "w") as file:
            file.write("test contents of directory")
        hash_value = internal.calculate_dir_hash(test_dir, hash_algo="md5")
        expected_hash_value = [
            "56e708a2bdf92887d4a7f25cbc13c555",
            "9473fdd0d880a43c21b7778d34872157",
        ]
        assert len(hash_value) == 2
        for val in hash_value.values():
            assert val in expected_hash_value


def test_check_dataset_change():
    with tempfile.TemporaryDirectory() as test_dir:
        # Generate directory and record its hash value.
        test_file_path_1 = os.path.join(test_dir, "test_1.txt")
        test_file_path_2 = os.path.join(test_dir, "test_2.txt")
        with open(test_file_path_1, "w") as file:
            file.write("test content")
        with open(test_file_path_2, "w") as file:
            file.write("test contents of directory")
        hash_value = internal.calculate_dir_hash(test_dir, hash_algo="md5")
        hash_value_file = "dataset_hash_value.txt"
        hash_value_file_paht = os.path.join(
            test_dir, "preprocessed", hash_value_file
        )
        os.makedirs(os.path.join(test_dir, "preprocessed"), exist_ok=True)
        with open(hash_value_file_paht, "w") as file:
            file.write(json.dumps(hash_value, indent=4))

        # Modify the content of a file.
        with open(test_file_path_2, "w") as file:
            file.write("test contents of directory changed")

        assert internal.check_dataset_change(test_dir, "preprocessed")