Unverified Commit b1dd592d authored by keli-wen's avatar keli-wen Committed by GitHub
Browse files

[Graphbolt] 2-step loading, Add `OnDiskDataset.load()` method to trigger actual loading. (#6142)

parent 58d98e03
......@@ -342,39 +342,48 @@ class OnDiskDataset(Dataset):
def __init__(self, path: str) -> None:
# Always call the preprocess function first. If already preprocessed,
# the function will return the original path directly.
self.dataset_dir = path
path = preprocess_ondisk_dataset(path)
with open(path) as f:
self.yaml_data = yaml.load(f, Loader=yaml.loader.SafeLoader)
self._convert_yaml_path_to_absolute_path()
self._meta = OnDiskMetaData(**self.yaml_data)
self._dataset_name = self._meta.dataset_name
self._graph = self._load_graph(self._meta.graph_topology)
self._feature = TorchBasedFeatureStore(self._meta.feature_data)
self._tasks = self._init_tasks(self._meta.tasks)
self._dataset_dir = path
yaml_path = preprocess_ondisk_dataset(path)
with open(yaml_path) as f:
self._yaml_data = yaml.load(f, Loader=yaml.loader.SafeLoader)
def _convert_yaml_path_to_absolute_path(self):
"""Convert the path in YAML file to absolute path."""
if "graph_topology" in self.yaml_data:
self.yaml_data["graph_topology"]["path"] = os.path.join(
self.dataset_dir, self.yaml_data["graph_topology"]["path"]
if "graph_topology" in self._yaml_data:
self._yaml_data["graph_topology"]["path"] = os.path.join(
self._dataset_dir, self._yaml_data["graph_topology"]["path"]
)
if "feature_data" in self.yaml_data:
for feature in self.yaml_data["feature_data"]:
if "feature_data" in self._yaml_data:
for feature in self._yaml_data["feature_data"]:
feature["path"] = os.path.join(
self.dataset_dir, feature["path"]
self._dataset_dir, feature["path"]
)
if "tasks" in self.yaml_data:
for task in self.yaml_data["tasks"]:
if "tasks" in self._yaml_data:
for task in self._yaml_data["tasks"]:
for set_name in ["train_set", "validation_set", "test_set"]:
if set_name not in task:
continue
for set_per_type in task[set_name]:
for data in set_per_type["data"]:
data["path"] = os.path.join(
self.dataset_dir, data["path"]
self._dataset_dir, data["path"]
)
def load(self):
"""Load the dataset."""
self._convert_yaml_path_to_absolute_path()
self._meta = OnDiskMetaData(**self._yaml_data)
self._dataset_name = self._meta.dataset_name
self._graph = self._load_graph(self._meta.graph_topology)
self._feature = TorchBasedFeatureStore(self._meta.feature_data)
self._tasks = self._init_tasks(self._meta.tasks)
return self
@property
def yaml_data(self) -> Dict:
"""Return the YAML data."""
return self._yaml_data
@property
def tasks(self) -> List[Task]:
"""Return the tasks."""
......
import os
import dgl.graphbolt as gb
import numpy as np
import pandas as pd
import scipy.sparse as sp
import torch
......@@ -56,3 +60,107 @@ def random_hetero_graph(num_nodes, num_edges, num_ntypes, num_etypes):
)
type_per_edge = torch.cat(type_per_edge, dim=0)
return (csc_indptr, indices, node_type_offset, type_per_edge, metadata)
def random_homo_graphbolt_graph(
test_dir, dataset_name, num_nodes, num_edges, num_classes
):
"""Generate random graphbolt version homograph"""
# Generate random edges.
nodes = np.repeat(np.arange(num_nodes), 5)
neighbors = np.random.randint(0, num_nodes, size=(num_edges))
edges = np.stack([nodes, neighbors], axis=1)
# Wrtie into edges/edge.csv
os.makedirs(os.path.join(test_dir, "edges"), exist_ok=True)
edges = pd.DataFrame(edges, columns=["src", "dst"])
edge_path = os.path.join("edges", "edge.csv")
edges.to_csv(
os.path.join(test_dir, edge_path),
index=False,
header=False,
)
# Generate random graph edge-feats.
edge_feats = np.random.rand(num_edges, 5)
os.makedirs(os.path.join(test_dir, "data"), exist_ok=True)
edge_feat_path = os.path.join("data", "edge-feat.npy")
np.save(os.path.join(test_dir, edge_feat_path), edge_feats)
# Generate random node-feats.
node_feats = np.random.rand(num_nodes, num_classes)
node_feat_path = os.path.join("data", "node-feat.npy")
np.save(os.path.join(test_dir, node_feat_path), node_feats)
# Generate train/test/valid set.
assert num_nodes % 4 == 0, "num_nodes must be divisible by 4"
each_set_size = num_nodes // 4
os.makedirs(os.path.join(test_dir, "set"), exist_ok=True)
train_pairs = (
np.arange(each_set_size),
np.arange(each_set_size, 2 * each_set_size),
)
train_labels = np.random.randint(0, num_classes, size=each_set_size)
train_data = np.vstack([train_pairs, train_labels]).T
train_path = os.path.join("set", "train.npy")
np.save(os.path.join(test_dir, train_path), train_data)
validation_pairs = (
np.arange(each_set_size, 2 * each_set_size),
np.arange(2 * each_set_size, 3 * each_set_size),
)
validation_labels = np.random.randint(0, num_classes, size=each_set_size)
validation_data = np.vstack([validation_pairs, validation_labels]).T
validation_path = os.path.join("set", "validation.npy")
np.save(os.path.join(test_dir, validation_path), validation_data)
test_pairs = (
np.arange(2 * each_set_size, 3 * each_set_size),
np.arange(3 * each_set_size, 4 * each_set_size),
)
test_labels = np.random.randint(0, num_classes, size=each_set_size)
test_data = np.vstack([test_pairs, test_labels]).T
test_path = os.path.join("set", "test.npy")
np.save(os.path.join(test_dir, test_path), test_data)
yaml_content = f"""
dataset_name: {dataset_name}
graph: # graph structure and required attributes.
nodes:
- num: {num_nodes}
edges:
- format: csv
path: {edge_path}
feature_data:
- domain: edge
type: null
name: feat
format: numpy
in_memory: true
path: {edge_feat_path}
feature_data:
- domain: node
type: null
name: feat
format: numpy
in_memory: false
path: {node_feat_path}
tasks:
- name: node_classification
num_classes: {num_classes}
train_set:
- type_name: null
data:
- format: numpy
path: {train_path}
validation_set:
- type_name: null
data:
- format: numpy
path: {validation_path}
test_set:
- type_name: null
data:
- format: numpy
path: {test_path}
"""
return yaml_content
import os
import pickle
import re
import tempfile
import unittest
......@@ -7,7 +9,6 @@ import gb_test_utils as gbt
import numpy as np
import pandas as pd
import pydantic
import pytest
import torch
......@@ -35,7 +36,7 @@ def test_OnDiskDataset_TVTSet_exceptions():
with open(yaml_file, "w") as f:
f.write(yaml_content)
with pytest.raises(pydantic.ValidationError):
_ = gb.OnDiskDataset(test_dir)
_ = gb.OnDiskDataset(test_dir).load()
# Case 2: ``type`` is not specified while multiple TVT sets are
# specified.
......@@ -58,7 +59,7 @@ def test_OnDiskDataset_TVTSet_exceptions():
AssertionError,
match=r"Only one TVT set is allowed if type is not specified.",
):
_ = gb.OnDiskDataset(test_dir)
_ = gb.OnDiskDataset(test_dir).load()
def test_OnDiskDataset_TVTSet_ItemSet_id_label():
......@@ -125,7 +126,7 @@ def test_OnDiskDataset_TVTSet_ItemSet_id_label():
with open(yaml_file, "w") as f:
f.write(yaml_content)
dataset = gb.OnDiskDataset(test_dir)
dataset = gb.OnDiskDataset(test_dir).load()
# Verify tasks.
assert len(dataset.tasks) == 1
......@@ -174,7 +175,7 @@ def test_OnDiskDataset_TVTSet_ItemSet_id_label():
with open(yaml_file, "w") as f:
f.write(yaml_content)
dataset = gb.OnDiskDataset(test_dir)
dataset = gb.OnDiskDataset(test_dir).load()
assert dataset.tasks[0].train_set is not None
assert dataset.tasks[0].validation_set is None
assert dataset.tasks[0].test_set is None
......@@ -258,7 +259,7 @@ def test_OnDiskDataset_TVTSet_ItemSet_node_pair_label():
with open(yaml_file, "w") as f:
f.write(yaml_content)
dataset = gb.OnDiskDataset(test_dir)
dataset = gb.OnDiskDataset(test_dir).load()
# Verify train set.
train_set = dataset.tasks[0].train_set
......@@ -373,7 +374,7 @@ def test_OnDiskDataset_TVTSet_ItemSet_node_pair_negs():
with open(yaml_file, "w") as f:
f.write(yaml_content)
dataset = gb.OnDiskDataset(test_dir)
dataset = gb.OnDiskDataset(test_dir).load()
# Verify train set.
train_set = dataset.tasks[0].train_set
......@@ -466,7 +467,7 @@ def test_OnDiskDataset_TVTSet_ItemSetDict_id_label():
with open(yaml_file, "w") as f:
f.write(yaml_content)
dataset = gb.OnDiskDataset(test_dir)
dataset = gb.OnDiskDataset(test_dir).load()
# Verify train set.
train_set = dataset.tasks[0].train_set
......@@ -571,7 +572,7 @@ def test_OnDiskDataset_TVTSet_ItemSetDict_node_pair_label():
with open(yaml_file, "w") as f:
f.write(yaml_content)
dataset = gb.OnDiskDataset(test_dir)
dataset = gb.OnDiskDataset(test_dir).load()
# Verify train set.
train_set = dataset.tasks[0].train_set
......@@ -672,7 +673,7 @@ def test_OnDiskDataset_Feature_heterograph():
with open(yaml_file, "w") as f:
f.write(yaml_content)
dataset = gb.OnDiskDataset(test_dir)
dataset = gb.OnDiskDataset(test_dir).load()
# Verify feature data storage.
feature_data = dataset.feature
......@@ -751,7 +752,7 @@ def test_OnDiskDataset_Feature_homograph():
with open(yaml_file, "w") as f:
f.write(yaml_content)
dataset = gb.OnDiskDataset(test_dir)
dataset = gb.OnDiskDataset(test_dir).load()
# Verify feature data storage.
feature_data = dataset.feature
......@@ -799,7 +800,7 @@ def test_OnDiskDataset_Graph_Exceptions():
pydantic.ValidationError,
match="1 validation error for OnDiskMetaData",
):
_ = gb.OnDiskDataset(test_dir)
_ = gb.OnDiskDataset(test_dir).load()
def test_OnDiskDataset_Graph_homogeneous():
......@@ -821,7 +822,7 @@ def test_OnDiskDataset_Graph_homogeneous():
with open(yaml_file, "w") as f:
f.write(yaml_content)
dataset = gb.OnDiskDataset(test_dir)
dataset = gb.OnDiskDataset(test_dir).load()
graph2 = dataset.graph
assert graph.num_nodes == graph2.num_nodes
......@@ -864,7 +865,7 @@ def test_OnDiskDataset_Graph_heterogeneous():
with open(yaml_file, "w") as f:
f.write(yaml_content)
dataset = gb.OnDiskDataset(test_dir)
dataset = gb.OnDiskDataset(test_dir).load()
graph2 = dataset.graph
assert graph.num_nodes == graph2.num_nodes
......@@ -891,7 +892,7 @@ def test_OnDiskDataset_Metadata():
with open(yaml_file, "w") as f:
f.write(yaml_content)
dataset = gb.OnDiskDataset(test_dir)
dataset = gb.OnDiskDataset(test_dir).load()
assert dataset.dataset_name == dataset_name
# Only dataset_name is specified.
......@@ -902,7 +903,7 @@ def test_OnDiskDataset_Metadata():
with open(yaml_file, "w") as f:
f.write(yaml_content)
dataset = gb.OnDiskDataset(test_dir)
dataset = gb.OnDiskDataset(test_dir).load()
assert dataset.dataset_name == dataset_name
......@@ -915,89 +916,14 @@ def test_OnDiskDataset_preprocess_homogeneous():
num_edges = 20000
num_classes = 10
# Generate random edges.
nodes = np.repeat(np.arange(num_nodes), 5)
neighbors = np.random.randint(0, num_nodes, size=(num_edges))
edges = np.stack([nodes, neighbors], axis=1)
# Wrtie into edges/edge.csv
os.makedirs(os.path.join(test_dir, "edges/"), exist_ok=True)
edges = pd.DataFrame(edges, columns=["src", "dst"])
edges.to_csv(
os.path.join(test_dir, "edges/edge.csv"),
index=False,
header=False,
# Generate random graph.
yaml_content = gbt.random_homo_graphbolt_graph(
test_dir,
dataset_name,
num_nodes,
num_edges,
num_classes,
)
# Generate random graph edge-feats.
edge_feats = np.random.rand(num_edges, 5)
os.makedirs(os.path.join(test_dir, "data/"), exist_ok=True)
np.save(os.path.join(test_dir, "data/edge-feat.npy"), edge_feats)
# Generate random node-feats.
node_feats = np.random.rand(num_nodes, 10)
np.save(os.path.join(test_dir, "data/node-feat.npy"), node_feats)
# Generate train/test/valid set.
os.makedirs(os.path.join(test_dir, "set/"), exist_ok=True)
train_pairs = (np.arange(1000), np.arange(1000, 2000))
train_labels = np.random.randint(0, 10, size=1000)
train_data = np.vstack([train_pairs, train_labels]).T
train_path = os.path.join(test_dir, "set/train.npy")
np.save(train_path, train_data)
validation_pairs = (np.arange(1000, 2000), np.arange(2000, 3000))
validation_labels = np.random.randint(0, 10, size=1000)
validation_data = np.vstack([validation_pairs, validation_labels]).T
validation_path = os.path.join(test_dir, "set/validation.npy")
np.save(validation_path, validation_data)
test_pairs = (np.arange(2000, 3000), np.arange(3000, 4000))
test_labels = np.random.randint(0, 10, size=1000)
test_data = np.vstack([test_pairs, test_labels]).T
test_path = os.path.join(test_dir, "set/test.npy")
np.save(test_path, test_data)
yaml_content = f"""
dataset_name: {dataset_name}
graph: # graph structure and required attributes.
nodes:
- num: {num_nodes}
edges:
- format: csv
path: edges/edge.csv
feature_data:
- domain: edge
type: null
name: feat
format: numpy
in_memory: true
path: data/edge-feat.npy
feature_data:
- domain: node
type: null
name: feat
format: numpy
in_memory: false
path: data/node-feat.npy
tasks:
- name: node_classification
num_classes: {num_classes}
train_set:
- type_name: null
data:
- format: numpy
path: set/train.npy
validation_set:
- type_name: null
data:
- format: numpy
path: set/validation.npy
test_set:
- type_name: null
data:
- format: numpy
path: set/test.npy
"""
yaml_file = os.path.join(test_dir, "metadata.yaml")
with open(yaml_file, "w") as f:
f.write(yaml_content)
......@@ -1371,3 +1297,264 @@ def test_OnDiskDataset_preprocess_yaml_content_windows():
yaml_data["tasks"][0][set_name][0]["data"][0]["path"],
)
)
def test_OnDiskDataset_load_name():
"""Test preprocess of OnDiskDataset."""
with tempfile.TemporaryDirectory() as test_dir:
# All metadata fields are specified.
dataset_name = "graphbolt_test"
num_nodes = 4000
num_edges = 20000
num_classes = 10
# Generate random graph.
yaml_content = gbt.random_homo_graphbolt_graph(
test_dir,
dataset_name,
num_nodes,
num_edges,
num_classes,
)
yaml_file = os.path.join(test_dir, "metadata.yaml")
with open(yaml_file, "w") as f:
f.write(yaml_content)
# Check modify `dataset_name` field.
dataset = gb.OnDiskDataset(test_dir)
dataset.yaml_data["dataset_name"] = "fake_name"
dataset.load()
assert dataset.dataset_name == "fake_name"
dataset = None
def test_OnDiskDataset_load_feature():
"""Test preprocess of OnDiskDataset."""
with tempfile.TemporaryDirectory() as test_dir:
# All metadata fields are specified.
dataset_name = "graphbolt_test"
num_nodes = 4000
num_edges = 20000
num_classes = 10
# Generate random graph.
yaml_content = gbt.random_homo_graphbolt_graph(
test_dir,
dataset_name,
num_nodes,
num_edges,
num_classes,
)
yaml_file = os.path.join(test_dir, "metadata.yaml")
with open(yaml_file, "w") as f:
f.write(yaml_content)
# Case1. Test modify the `in_memory` field.
dataset = gb.OnDiskDataset(test_dir).load()
original_feature_data = dataset.feature
dataset.yaml_data["feature_data"][0]["in_memory"] = True
dataset.load()
modify_feature_data = dataset.feature
# After modify the `in_memory` field, the feature data should be
# equal.
assert torch.equal(
original_feature_data.read("node", None, "feat"),
modify_feature_data.read("node", None, "feat"),
)
# Case2. Test modify the `format` field.
dataset = gb.OnDiskDataset(test_dir)
# If `format` is torch and `in_memory` is False, it will
# raise an AssertionError.
dataset.yaml_data["feature_data"][0]["format"] = "torch"
with pytest.raises(
AssertionError,
match="^Pytorch tensor can only be loaded in memory,",
):
dataset.load()
dataset = gb.OnDiskDataset(test_dir)
dataset.yaml_data["feature_data"][0]["in_memory"] = True
dataset.yaml_data["feature_data"][0]["format"] = "torch"
# If `format` is torch and `in_memory` is True, it will
# raise an UnpicklingError.
with pytest.raises(pickle.UnpicklingError):
dataset.load()
# Case3. Test modify the `path` field.
dataset = gb.OnDiskDataset(test_dir)
# Use invalid path will raise an FileNotFoundError.
dataset.yaml_data["feature_data"][0]["path"] = "fake_path"
with pytest.raises(
FileNotFoundError,
match=r"\[Errno 2\] No such file or directory:",
):
dataset.load()
# Modifying the `path` field to an absolute path should work.
# In os.path.join, if a segment is an absolute path (which
# on Windows requires both a drive and a root), then all
# previous segments are ignored and joining continues from
# the absolute path segment.
dataset = gb.OnDiskDataset(test_dir).load()
original_feature_data = dataset.feature
dataset.yaml_data["feature_data"][0]["path"] = os.path.join(
test_dir, dataset.yaml_data["feature_data"][0]["path"]
)
dataset.load()
modify_feature_data = dataset.feature
assert torch.equal(
original_feature_data.read("node", None, "feat"),
modify_feature_data.read("node", None, "feat"),
)
original_feature_data = None
modify_feature_data = None
dataset = None
def test_OnDiskDataset_load_graph():
"""Test preprocess of OnDiskDataset."""
with tempfile.TemporaryDirectory() as test_dir:
# All metadata fields are specified.
dataset_name = "graphbolt_test"
num_nodes = 4000
num_edges = 20000
num_classes = 10
# Generate random graph.
yaml_content = gbt.random_homo_graphbolt_graph(
test_dir,
dataset_name,
num_nodes,
num_edges,
num_classes,
)
yaml_file = os.path.join(test_dir, "metadata.yaml")
with open(yaml_file, "w") as f:
f.write(yaml_content)
# Case1. Test modify the `type` field.
dataset = gb.OnDiskDataset(test_dir)
dataset.yaml_data["graph_topology"]["type"] = "fake_type"
with pytest.raises(
pydantic.ValidationError,
match="Input should be 'CSCSamplingGraph'",
):
dataset.load()
# Case2. Test modify the `path` field.
dataset = gb.OnDiskDataset(test_dir)
dataset.yaml_data["graph_topology"]["path"] = "fake_path"
with pytest.raises(
FileNotFoundError,
match=r"\[Errno 2\] No such file or directory:",
):
dataset.load()
# Modifying the `path` field to an absolute path should work.
# In os.path.join, if a segment is an absolute path (which
# on Windows requires both a drive and a root), then all
# previous segments are ignored and joining continues from
# the absolute path segment.
dataset = gb.OnDiskDataset(test_dir).load()
original_graph = dataset.graph
dataset.yaml_data["graph_topology"]["path"] = os.path.join(
test_dir, dataset.yaml_data["graph_topology"]["path"]
)
dataset.load()
modify_graph = dataset.graph
assert torch.equal(
original_graph.csc_indptr,
modify_graph.csc_indptr,
)
original_graph = None
modify_graph = None
dataset = None
def test_OnDiskDataset_load_tasks():
"""Test preprocess of OnDiskDataset."""
with tempfile.TemporaryDirectory() as test_dir:
# All metadata fields are specified.
dataset_name = "graphbolt_test"
num_nodes = 4000
num_edges = 20000
num_classes = 10
# Generate random graph.
yaml_content = gbt.random_homo_graphbolt_graph(
test_dir,
dataset_name,
num_nodes,
num_edges,
num_classes,
)
yaml_file = os.path.join(test_dir, "metadata.yaml")
with open(yaml_file, "w") as f:
f.write(yaml_content)
# Case1. Test modify the `name` field.
dataset = gb.OnDiskDataset(test_dir)
dataset.yaml_data["tasks"][0]["name"] = "fake_name"
dataset.load()
assert dataset.tasks[0].metadata["name"] == "fake_name"
# Case2. Test modify the `num_classes` field.
dataset = gb.OnDiskDataset(test_dir)
dataset.yaml_data["tasks"][0]["num_classes"] = 100
dataset.load()
assert dataset.tasks[0].metadata["num_classes"] == 100
# Case3. Test modify the `format` field.
dataset = gb.OnDiskDataset(test_dir)
# Change the `format` field to torch.
dataset.yaml_data["tasks"][0]["train_set"][0]["data"][0][
"format"
] = "torch"
with pytest.raises(pickle.UnpicklingError):
dataset.load()
dataset = gb.OnDiskDataset(test_dir)
dataset.yaml_data["tasks"][0]["train_set"][0]["data"][0][
"format"
] = "torch"
# Change the `in_memory` field to False will also raise an
# UnpicklingError. Unlike the case of testing `feature_data`.
dataset.yaml_data["tasks"][0]["train_set"][0]["data"][0][
"in_memory"
] = False
with pytest.raises(pickle.UnpicklingError):
dataset.load()
# Case4. Test modify the `path` field.
dataset = gb.OnDiskDataset(test_dir)
# Use invalid path will raise an FileNotFoundError.
dataset.yaml_data["tasks"][0]["train_set"][0]["data"][0][
"path"
] = "fake_path"
with pytest.raises(
FileNotFoundError,
match=r"\[Errno 2\] No such file or directory:",
):
dataset.load()
# Modifying the `path` field to an absolute path should work.
# In os.path.join, if a segment is an absolute path (which
# on Windows requires both a drive and a root), then all
# previous segments are ignored and joining continues from
# the absolute path segment.
dataset = gb.OnDiskDataset(test_dir).load()
original_train_set = dataset.tasks[0].train_set._items
dataset.yaml_data["tasks"][0]["train_set"][0]["data"][0][
"path"
] = os.path.join(
test_dir,
dataset.yaml_data["tasks"][0]["train_set"][0]["data"][0]["path"],
)
dataset.load()
modify_train_set = dataset.tasks[0].train_set._items
assert torch.equal(
original_train_set[0],
modify_train_set[0],
)
original_train_set = None
modify_train_set = None
dataset = None
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment