ondisk_dataset.py 15.7 KB
Newer Older
1
2
"""GraphBolt OnDiskDataset."""

3
4
5
import os
import shutil
from copy import deepcopy
6
from typing import Dict, List
7

8
9
10
11
12
13
import pandas as pd
import torch
import yaml

import dgl

14
from ..dataset import Dataset, Task
15
from ..itemset import ItemSet, ItemSetDict
16
from ..utils import read_data, save_data
17
18
19
20
21
22
from .csc_sampling_graph import (
    CSCSamplingGraph,
    from_dglgraph,
    load_csc_sampling_graph,
    save_csc_sampling_graph,
)
23
24
25
26
27
28
from .ondisk_metadata import (
    OnDiskGraphTopology,
    OnDiskMetaData,
    OnDiskTaskData,
    OnDiskTVTSet,
)
29
from .torch_based_feature_store import TorchBasedFeatureStore
30

31
32
33
__all__ = ["OnDiskDataset", "preprocess_ondisk_dataset"]


34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
def _copy_or_convert_data(
    input_path,
    output_path,
    input_format,
    output_format="numpy",
    in_memory=True,
):
    """Copy or convert the data from input_path to output_path."""
    os.makedirs(os.path.dirname(output_path), exist_ok=True)
    if input_format == "numpy":
        # If the original format is numpy, just copy the file.
        shutil.copyfile(input_path, output_path)
    else:
        # If the original format is not numpy, convert it to numpy.
        data = read_data(input_path, input_format, in_memory)
        save_data(data, output_path, output_format)


def preprocess_ondisk_dataset(dataset_dir: str) -> str:
53
54
55
56
57
    """Preprocess the on-disk dataset. Parse the input config file,
    load the data, and save the data in the format that GraphBolt supports.

    Parameters
    ----------
58
59
    dataset_dir : str
        The path to the dataset directory.
60
61
62
63
64
65

    Returns
    -------
    output_config_path : str
        The path to the output config file.
    """
66
67
68
69
70
71
72
73
74
    # Check if the dataset path is valid.
    if not os.path.exists(dataset_dir):
        raise RuntimeError(f"Invalid dataset path: {dataset_dir}")

    # Check if the dataset_dir is a directory.
    if not os.path.isdir(dataset_dir):
        raise RuntimeError(
            f"The dataset must be a directory. But got {dataset_dir}"
        )
75

76
    # 0. Check if the dataset is already preprocessed.
77
78
    preprocess_metadata_path = os.path.join("preprocessed", "metadata.yaml")
    if os.path.exists(os.path.join(dataset_dir, preprocess_metadata_path)):
79
        print("The dataset is already preprocessed.")
80
        return os.path.join(dataset_dir, preprocess_metadata_path)
81

82
    print("Start to preprocess the on-disk dataset.")
83
    processed_dir_prefix = "preprocessed"
84
85
86
87
88
89
90
91
92

    # Check if the metadata.yaml exists.
    metadata_file_path = os.path.join(dataset_dir, "metadata.yaml")
    if not os.path.exists(metadata_file_path):
        raise RuntimeError("metadata.yaml does not exist.")

    # Read the input config.
    with open(metadata_file_path, "r") as f:
        input_config = yaml.safe_load(f)
93

94
    # 1. Make `processed_dir_abs` directory if it does not exist.
95
    os.makedirs(os.path.join(dataset_dir, processed_dir_prefix), exist_ok=True)
96
97
98
    output_config = deepcopy(input_config)

    # 2. Load the edge data and create a DGLGraph.
99
100
    if "graph" not in input_config:
        raise RuntimeError("Invalid config: does not contain graph field.")
101
102
103
104
105
    is_homogeneous = "type" not in input_config["graph"]["nodes"][0]
    if is_homogeneous:
        # Homogeneous graph.
        num_nodes = input_config["graph"]["nodes"][0]["num"]
        edge_data = pd.read_csv(
106
107
108
            os.path.join(
                dataset_dir, input_config["graph"]["edges"][0]["path"]
            ),
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
            names=["src", "dst"],
        )
        src, dst = edge_data["src"].to_numpy(), edge_data["dst"].to_numpy()

        g = dgl.graph((src, dst), num_nodes=num_nodes)
    else:
        # Heterogeneous graph.
        # Construct the num nodes dict.
        num_nodes_dict = {}
        for node_info in input_config["graph"]["nodes"]:
            num_nodes_dict[node_info["type"]] = node_info["num"]
        # Construct the data dict.
        data_dict = {}
        for edge_info in input_config["graph"]["edges"]:
            edge_data = pd.read_csv(
124
125
                os.path.join(dataset_dir, edge_info["path"]),
                names=["src", "dst"],
126
127
128
129
130
131
132
133
134
135
136
137
138
            )
            src = torch.tensor(edge_data["src"])
            dst = torch.tensor(edge_data["dst"])
            data_dict[tuple(edge_info["type"].split(":"))] = (src, dst)
        # Construct the heterograph.
        g = dgl.heterograph(data_dict, num_nodes_dict)

    # 3. Load the sampling related node/edge features and add them to
    # the sampling-graph.
    if input_config["graph"].get("feature_data", None):
        for graph_feature in input_config["graph"]["feature_data"]:
            if graph_feature["domain"] == "node":
                node_data = read_data(
139
                    os.path.join(dataset_dir, graph_feature["path"]),
140
141
142
143
144
145
                    graph_feature["format"],
                    in_memory=graph_feature["in_memory"],
                )
                g.ndata[graph_feature["name"]] = node_data
            if graph_feature["domain"] == "edge":
                edge_data = read_data(
146
                    os.path.join(dataset_dir, graph_feature["path"]),
147
148
149
150
151
152
                    graph_feature["format"],
                    in_memory=graph_feature["in_memory"],
                )
                g.edata[graph_feature["name"]] = edge_data

    # 4. Convert the DGLGraph to a CSCSamplingGraph.
153
    csc_sampling_graph = from_dglgraph(g, is_homogeneous)
154
155
156
157

    # 5. Save the CSCSamplingGraph and modify the output_config.
    output_config["graph_topology"] = {}
    output_config["graph_topology"]["type"] = "CSCSamplingGraph"
158
159
    output_config["graph_topology"]["path"] = os.path.join(
        processed_dir_prefix, "csc_sampling_graph.tar"
160
161
162
    )

    save_csc_sampling_graph(
163
164
165
166
167
        csc_sampling_graph,
        os.path.join(
            dataset_dir,
            output_config["graph_topology"]["path"],
        ),
168
169
170
171
172
    )
    del output_config["graph"]

    # 6. Load the node/edge features and do necessary conversion.
    if input_config.get("feature_data", None):
173
        for feature, out_feature in zip(
174
175
176
177
            input_config["feature_data"], output_config["feature_data"]
        ):
            # Always save the feature in numpy format.
            out_feature["format"] = "numpy"
178
179
180
181
182
            out_feature["path"] = os.path.join(
                processed_dir_prefix, feature["path"].replace("pt", "npy")
            )
            _copy_or_convert_data(
                os.path.join(dataset_dir, feature["path"]),
183
                os.path.join(dataset_dir, out_feature["path"]),
184
185
186
                feature["format"],
                out_feature["format"],
                feature["in_memory"],
187
            )
188

189
    # 7. Save tasks and train/val/test split according to the output_config.
190
    if input_config.get("tasks", None):
191
        for input_task, output_task in zip(
192
            input_config["tasks"], output_config["tasks"]
193
        ):
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
            for set_name in ["train_set", "validation_set", "test_set"]:
                if set_name not in input_task:
                    continue
                for input_set_per_type, output_set_per_type in zip(
                    input_task[set_name], output_task[set_name]
                ):
                    for input_data, output_data in zip(
                        input_set_per_type["data"], output_set_per_type["data"]
                    ):
                        # Always save the feature in numpy format.
                        output_data["format"] = "numpy"
                        output_data["path"] = os.path.join(
                            processed_dir_prefix,
                            input_data["path"].replace("pt", "npy"),
                        )
                        _copy_or_convert_data(
                            os.path.join(dataset_dir, input_data["path"]),
211
                            os.path.join(dataset_dir, output_data["path"]),
212
213
214
                            input_data["format"],
                            output_data["format"],
                        )
215
216

    # 8. Save the output_config.
217
    output_config_path = os.path.join(dataset_dir, preprocess_metadata_path)
218
219
    with open(output_config_path, "w") as f:
        yaml.dump(output_config, f)
220
    print("Finish preprocessing the on-disk dataset.")
221
222
223

    # 9. Return the absolute path of the preprocessing yaml file.
    return output_config_path
224
225


226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
class OnDiskTask:
    """An on-disk task.

    An on-disk task is for ``OnDiskDataset``. It contains the metadata and the
    train/val/test sets.
    """

    def __init__(
        self,
        metadata: Dict,
        train_set: ItemSet or ItemSetDict,
        validation_set: ItemSet or ItemSetDict,
        test_set: ItemSet or ItemSetDict,
    ):
        """Initialize a task.

        Parameters
        ----------
        metadata : Dict
            Metadata.
        train_set : ItemSet or ItemSetDict
            Training set.
        validation_set : ItemSet or ItemSetDict
            Validation set.
        test_set : ItemSet or ItemSetDict
            Test set.
        """
        self._metadata = metadata
        self._train_set = train_set
        self._validation_set = validation_set
        self._test_set = test_set

    @property
    def metadata(self) -> Dict:
        """Return the task metadata."""
        return self._metadata

    @property
    def train_set(self) -> ItemSet or ItemSetDict:
        """Return the training set."""
        return self._train_set

    @property
    def validation_set(self) -> ItemSet or ItemSetDict:
        """Return the validation set."""
        return self._validation_set

    @property
    def test_set(self) -> ItemSet or ItemSetDict:
        """Return the test set."""
        return self._test_set


279
280
281
282
283
284
285
286
287
288
289
290
291
class OnDiskDataset(Dataset):
    """An on-disk dataset.

    An on-disk dataset is a dataset which reads graph topology, feature data
    and TVT set from disk. Due to limited resources, the data which are too
    large to fit into RAM will remain on disk while others reside in RAM once
    ``OnDiskDataset`` is initialized. This behavior could be controled by user
    via ``in_memory`` field in YAML file.

    A full example of YAML file is as follows:

    .. code-block:: yaml

292
        dataset_name: graphbolt_test
293
294
295
        graph_topology:
          type: CSCSamplingGraph
          path: graph_topology/csc_sampling_graph.tar
296
297
298
299
300
301
302
303
304
305
306
307
308
        feature_data:
          - domain: node
            type: paper
            name: feat
            format: numpy
            in_memory: false
            path: node_data/paper-feat.npy
          - domain: edge
            type: "author:writes:paper"
            name: feat
            format: numpy
            in_memory: false
            path: edge_data/author-writes-paper-feat.npy
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
        tasks:
          - name: "edge_classification"
            num_classes: 10
            train_set:
              - type: paper # could be null for homogeneous graph.
                data: # multiple data sources could be specified.
                  - format: numpy
                    in_memory: true # If not specified, default to true.
                    path: set/paper-train-src.npy
                  - format: numpy
                    in_memory: false
                    path: set/paper-train-dst.npy
            validation_set:
              - type: paper
                data:
                  - format: numpy
                    in_memory: true
                    path: set/paper-validation.npy
            test_set:
              - type: paper
                data:
                  - format: numpy
                    in_memory: true
                    path: set/paper-test.npy
333
334
335
336
337
338
339
340

    Parameters
    ----------
    path: str
        The YAML file path.
    """

    def __init__(self, path: str) -> None:
341
342
        # Always call the preprocess function first. If already preprocessed,
        # the function will return the original path directly.
343
344
345
346
        self._dataset_dir = path
        yaml_path = preprocess_ondisk_dataset(path)
        with open(yaml_path) as f:
            self._yaml_data = yaml.load(f, Loader=yaml.loader.SafeLoader)
347

348
349
    def _convert_yaml_path_to_absolute_path(self):
        """Convert the path in YAML file to absolute path."""
350
351
352
        if "graph_topology" in self._yaml_data:
            self._yaml_data["graph_topology"]["path"] = os.path.join(
                self._dataset_dir, self._yaml_data["graph_topology"]["path"]
353
            )
354
355
        if "feature_data" in self._yaml_data:
            for feature in self._yaml_data["feature_data"]:
356
                feature["path"] = os.path.join(
357
                    self._dataset_dir, feature["path"]
358
                )
359
360
        if "tasks" in self._yaml_data:
            for task in self._yaml_data["tasks"]:
361
362
363
364
365
366
                for set_name in ["train_set", "validation_set", "test_set"]:
                    if set_name not in task:
                        continue
                    for set_per_type in task[set_name]:
                        for data in set_per_type["data"]:
                            data["path"] = os.path.join(
367
                                self._dataset_dir, data["path"]
368
369
                            )

370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
    def load(self):
        """Load the dataset."""
        self._convert_yaml_path_to_absolute_path()
        self._meta = OnDiskMetaData(**self._yaml_data)
        self._dataset_name = self._meta.dataset_name
        self._graph = self._load_graph(self._meta.graph_topology)
        self._feature = TorchBasedFeatureStore(self._meta.feature_data)
        self._tasks = self._init_tasks(self._meta.tasks)
        return self

    @property
    def yaml_data(self) -> Dict:
        """Return the YAML data."""
        return self._yaml_data

385
    @property
386
387
388
    def tasks(self) -> List[Task]:
        """Return the tasks."""
        return self._tasks
389

390
    @property
391
392
    def graph(self) -> object:
        """Return the graph."""
393
        return self._graph
394

395
    @property
396
    def feature(self) -> TorchBasedFeatureStore:
397
        """Return the feature."""
398
        return self._feature
399

400
401
402
403
404
    @property
    def dataset_name(self) -> str:
        """Return the dataset name."""
        return self._dataset_name

405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
    def _init_tasks(self, tasks: List[OnDiskTaskData]) -> List[OnDiskTask]:
        """Initialize the tasks."""
        ret = []
        if tasks is None:
            return ret
        for task in tasks:
            ret.append(
                OnDiskTask(
                    task.extra_fields,
                    self._init_tvt_set(task.train_set),
                    self._init_tvt_set(task.validation_set),
                    self._init_tvt_set(task.test_set),
                )
            )
        return ret
420

421
422
423
424
425
426
427
428
429
430
431
432
    def _load_graph(
        self, graph_topology: OnDiskGraphTopology
    ) -> CSCSamplingGraph:
        """Load the graph topology."""
        if graph_topology is None:
            return None
        if graph_topology.type == "CSCSamplingGraph":
            return load_csc_sampling_graph(graph_topology.path)
        raise NotImplementedError(
            f"Graph topology type {graph_topology.type} is not supported."
        )

433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
    def _init_tvt_set(
        self, tvt_set: List[OnDiskTVTSet]
    ) -> ItemSet or ItemSetDict:
        """Initialize the TVT set."""
        ret = None
        if (tvt_set is None) or (len(tvt_set) == 0):
            return ret
        if tvt_set[0].type is None:
            assert (
                len(tvt_set) == 1
            ), "Only one TVT set is allowed if type is not specified."
            ret = ItemSet(
                tuple(
                    read_data(data.path, data.format, data.in_memory)
                    for data in tvt_set[0].data
448
                )
449
450
451
452
453
454
455
456
            )
        else:
            data = {}
            for tvt in tvt_set:
                data[tvt.type] = ItemSet(
                    tuple(
                        read_data(data.path, data.format, data.in_memory)
                        for data in tvt.data
457
                    )
458
459
                )
            ret = ItemSetDict(data)
460
        return ret