Unverified Commit 7d43c769 authored by Rhett Ying's avatar Rhett Ying Committed by GitHub
Browse files

[GraphBolt] add end2end tests for OnDiskDataset (#6562)

parent 19096c6a
...@@ -104,7 +104,7 @@ def random_homo_graphbolt_graph( ...@@ -104,7 +104,7 @@ def random_homo_graphbolt_graph(
) )
# Generate random graph edge-feats. # Generate random graph edge-feats.
edge_feats = np.random.rand(num_edges, 5) edge_feats = np.random.rand(num_edges, num_classes)
os.makedirs(os.path.join(test_dir, "data"), exist_ok=True) os.makedirs(os.path.join(test_dir, "data"), exist_ok=True)
edge_feat_path = os.path.join("data", "edge-feat.npy") edge_feat_path = os.path.join("data", "edge-feat.npy")
np.save(os.path.join(test_dir, edge_feat_path), edge_feats) np.save(os.path.join(test_dir, edge_feat_path), edge_feats)
...@@ -125,8 +125,7 @@ def random_homo_graphbolt_graph( ...@@ -125,8 +125,7 @@ def random_homo_graphbolt_graph(
np.arange(each_set_size), np.arange(each_set_size),
np.arange(each_set_size, 2 * each_set_size), np.arange(each_set_size, 2 * each_set_size),
) )
train_labels = np.random.randint(0, num_classes, size=each_set_size) train_data = np.vstack(train_pairs).T.astype(np.int64)
train_data = np.vstack([train_pairs, train_labels]).T
train_path = os.path.join("set", "train.npy") train_path = os.path.join("set", "train.npy")
np.save(os.path.join(test_dir, train_path), train_data) np.save(os.path.join(test_dir, train_path), train_data)
...@@ -134,8 +133,7 @@ def random_homo_graphbolt_graph( ...@@ -134,8 +133,7 @@ def random_homo_graphbolt_graph(
np.arange(each_set_size, 2 * each_set_size), np.arange(each_set_size, 2 * each_set_size),
np.arange(2 * each_set_size, 3 * each_set_size), np.arange(2 * each_set_size, 3 * each_set_size),
) )
validation_labels = np.random.randint(0, num_classes, size=each_set_size) validation_data = np.vstack(validation_pairs).T.astype(np.int64)
validation_data = np.vstack([validation_pairs, validation_labels]).T
validation_path = os.path.join("set", "validation.npy") validation_path = os.path.join("set", "validation.npy")
np.save(os.path.join(test_dir, validation_path), validation_data) np.save(os.path.join(test_dir, validation_path), validation_data)
...@@ -143,8 +141,7 @@ def random_homo_graphbolt_graph( ...@@ -143,8 +141,7 @@ def random_homo_graphbolt_graph(
np.arange(2 * each_set_size, 3 * each_set_size), np.arange(2 * each_set_size, 3 * each_set_size),
np.arange(3 * each_set_size, 4 * each_set_size), np.arange(3 * each_set_size, 4 * each_set_size),
) )
test_labels = np.random.randint(0, num_classes, size=each_set_size) test_data = np.vstack(test_pairs).T.astype(np.int64)
test_data = np.vstack([test_pairs, test_labels]).T
test_path = os.path.join("set", "test.npy") test_path = os.path.join("set", "test.npy")
np.save(os.path.join(test_dir, test_path), test_data) np.save(os.path.join(test_dir, test_path), test_data)
...@@ -168,25 +165,147 @@ def random_homo_graphbolt_graph( ...@@ -168,25 +165,147 @@ def random_homo_graphbolt_graph(
type: null type: null
name: feat name: feat
format: numpy format: numpy
in_memory: false in_memory: true
path: {node_feat_path} path: {node_feat_path}
- domain: edge
type: null
name: feat
format: numpy
in_memory: true
path: {edge_feat_path}
tasks: tasks:
- name: node_classification - name: link_prediction
num_classes: {num_classes} num_classes: {num_classes}
train_set: train_set:
- type_name: null - type: null
data: data:
- format: numpy - name: node_pairs
format: numpy
in_memory: true
path: {train_path} path: {train_path}
validation_set: validation_set:
- type_name: null - type: null
data: data:
- format: numpy - name: node_pairs
format: numpy
in_memory: true
path: {validation_path} path: {validation_path}
test_set: test_set:
- type_name: null - type: null
data: data:
- format: numpy - name: node_pairs
format: numpy
in_memory: true
path: {test_path} path: {test_path}
""" """
return yaml_content return yaml_content
def genereate_raw_data_for_hetero_dataset(
test_dir, dataset_name, num_nodes, num_edges, num_classes
):
# Generate edges.
edges_path = {}
for etype, num_edge in num_edges.items():
src_ntype, etype_str, dst_ntype = etype
src = torch.randint(0, num_nodes[src_ntype], (num_edge,))
dst = torch.randint(0, num_nodes[dst_ntype], (num_edge,))
# Write into edges/edge.csv
os.makedirs(os.path.join(test_dir, "edges"), exist_ok=True)
edges = pd.DataFrame(
np.stack([src, dst], axis=1), columns=["src", "dst"]
)
edge_path = os.path.join("edges", f"{etype_str}.csv")
edges.to_csv(
os.path.join(test_dir, edge_path),
index=False,
header=False,
)
edges_path[etype_str] = edge_path
# Generate node features.
node_feats_path = {}
os.makedirs(os.path.join(test_dir, "data"), exist_ok=True)
for ntype, num_node in num_nodes.items():
node_feat_path = os.path.join("data", f"{ntype}-feat.npy")
node_feats = np.random.rand(num_node, num_classes)
np.save(os.path.join(test_dir, node_feat_path), node_feats)
node_feats_path[ntype] = node_feat_path
# Generate train/test/valid set.
os.makedirs(os.path.join(test_dir, "set"), exist_ok=True)
user_ids = np.arange(num_nodes["user"])
np.random.shuffle(user_ids)
num_train = int(num_nodes["user"] * 0.6)
num_validation = int(num_nodes["user"] * 0.2)
num_test = num_nodes["user"] - num_train - num_validation
train_path = os.path.join("set", "train.npy")
np.save(os.path.join(test_dir, train_path), user_ids[:num_train])
validation_path = os.path.join("set", "validation.npy")
np.save(
os.path.join(test_dir, validation_path),
user_ids[num_train : num_train + num_validation],
)
test_path = os.path.join("set", "test.npy")
np.save(
os.path.join(test_dir, test_path),
user_ids[num_train + num_validation :],
)
yaml_content = f"""
dataset_name: {dataset_name}
graph: # graph structure and required attributes.
nodes:
- type: user
num: {num_nodes["user"]}
- type: item
num: {num_nodes["item"]}
edges:
- type: "user:follow:user"
format: csv
path: {edges_path["follow"]}
- type: "user:click:item"
format: csv
path: {edges_path["click"]}
feature_data:
- domain: node
type: user
name: feat
format: numpy
in_memory: true
path: {node_feats_path["user"]}
- domain: node
type: item
name: feat
format: numpy
in_memory: true
path: {node_feats_path["item"]}
tasks:
- name: node_classification
num_classes: {num_classes}
train_set:
- type: user
data:
- name: seed_nodes
format: numpy
in_memory: true
path: {train_path}
validation_set:
- type: user
data:
- name: seed_nodes
format: numpy
in_memory: true
path: {validation_path}
test_set:
- type: user
data:
- name: seed_nodes
format: numpy
in_memory: true
path: {test_path}
"""
yaml_file = os.path.join(test_dir, "metadata.yaml")
with open(yaml_file, "w") as f:
f.write(yaml_content)
...@@ -1334,17 +1334,17 @@ def test_OnDiskDataset_preprocess_yaml_content_unix(): ...@@ -1334,17 +1334,17 @@ def test_OnDiskDataset_preprocess_yaml_content_unix():
- name: node_classification - name: node_classification
num_classes: {num_classes} num_classes: {num_classes}
train_set: train_set:
- type_name: null - type: null
data: data:
- format: numpy - format: numpy
path: set/train.npy path: set/train.npy
validation_set: validation_set:
- type_name: null - type: null
data: data:
- format: numpy - format: numpy
path: set/validation.npy path: set/validation.npy
test_set: test_set:
- type_name: null - type: null
data: data:
- format: numpy - format: numpy
path: set/test.npy path: set/test.npy
...@@ -1373,17 +1373,17 @@ def test_OnDiskDataset_preprocess_yaml_content_unix(): ...@@ -1373,17 +1373,17 @@ def test_OnDiskDataset_preprocess_yaml_content_unix():
- name: node_classification - name: node_classification
num_classes: {num_classes} num_classes: {num_classes}
train_set: train_set:
- type_name: null - type: null
data: data:
- format: numpy - format: numpy
path: preprocessed/set/train.npy path: preprocessed/set/train.npy
validation_set: validation_set:
- type_name: null - type: null
data: data:
- format: numpy - format: numpy
path: preprocessed/set/validation.npy path: preprocessed/set/validation.npy
test_set: test_set:
- type_name: null - type: null
data: data:
- format: numpy - format: numpy
path: preprocessed/set/test.npy path: preprocessed/set/test.npy
...@@ -1488,17 +1488,17 @@ def test_OnDiskDataset_preprocess_yaml_content_windows(): ...@@ -1488,17 +1488,17 @@ def test_OnDiskDataset_preprocess_yaml_content_windows():
- name: node_classification - name: node_classification
num_classes: {num_classes} num_classes: {num_classes}
train_set: train_set:
- type_name: null - type: null
data: data:
- format: numpy - format: numpy
path: set\\train.npy path: set\\train.npy
validation_set: validation_set:
- type_name: null - type: null
data: data:
- format: numpy - format: numpy
path: set\\validation.npy path: set\\validation.npy
test_set: test_set:
- type_name: null - type: null
data: data:
- format: numpy - format: numpy
path: set\\test.npy path: set\\test.npy
...@@ -1527,17 +1527,17 @@ def test_OnDiskDataset_preprocess_yaml_content_windows(): ...@@ -1527,17 +1527,17 @@ def test_OnDiskDataset_preprocess_yaml_content_windows():
- name: node_classification - name: node_classification
num_classes: {num_classes} num_classes: {num_classes}
train_set: train_set:
- type_name: null - type: null
data: data:
- format: numpy - format: numpy
path: preprocessed\\set\\train.npy path: preprocessed\\set\\train.npy
validation_set: validation_set:
- type_name: null - type: null
data: data:
- format: numpy - format: numpy
path: preprocessed\\set\\validation.npy path: preprocessed\\set\\validation.npy
test_set: test_set:
- type_name: null - type: null
data: data:
- format: numpy - format: numpy
path: preprocessed\\set\\test.npy path: preprocessed\\set\\test.npy
...@@ -1631,6 +1631,7 @@ def test_OnDiskDataset_load_feature(): ...@@ -1631,6 +1631,7 @@ def test_OnDiskDataset_load_feature():
dataset = gb.OnDiskDataset(test_dir) dataset = gb.OnDiskDataset(test_dir)
# If `format` is torch and `in_memory` is False, it will # If `format` is torch and `in_memory` is False, it will
# raise an AssertionError. # raise an AssertionError.
dataset.yaml_data["feature_data"][0]["in_memory"] = False
dataset.yaml_data["feature_data"][0]["format"] = "torch" dataset.yaml_data["feature_data"][0]["format"] = "torch"
with pytest.raises( with pytest.raises(
AssertionError, AssertionError,
...@@ -2018,7 +2019,7 @@ def test_OnDiskDataset_load_1D_feature(fmt): ...@@ -2018,7 +2019,7 @@ def test_OnDiskDataset_load_1D_feature(fmt):
- name: node_classification - name: node_classification
num_classes: {num_classes} num_classes: {num_classes}
train_set: train_set:
- type_name: null - type: null
data: data:
- format: {fmt} - format: {fmt}
path: {train_path} path: {train_path}
...@@ -2069,3 +2070,143 @@ def test_BuiltinDataset(): ...@@ -2069,3 +2070,143 @@ def test_BuiltinDataset():
match=rf"Dataset {dataset_name} is not available.*", match=rf"Dataset {dataset_name} is not available.*",
): ):
_ = gb.BuiltinDataset(name=dataset_name, root=test_dir).load() _ = gb.BuiltinDataset(name=dataset_name, root=test_dir).load()
@pytest.mark.parametrize("include_original_edge_id", [True, False])
def test_OnDiskDataset_homogeneous(include_original_edge_id):
"""Preprocess and instantiate OnDiskDataset for homogeneous graph."""
with tempfile.TemporaryDirectory() as test_dir:
# All metadata fields are specified.
dataset_name = "graphbolt_test"
num_nodes = 4000
num_edges = 20000
num_classes = 10
# Generate random graph.
yaml_content = gbt.random_homo_graphbolt_graph(
test_dir,
dataset_name,
num_nodes,
num_edges,
num_classes,
)
yaml_file = os.path.join(test_dir, "metadata.yaml")
with open(yaml_file, "w") as f:
f.write(yaml_content)
dataset = gb.OnDiskDataset(
test_dir, include_original_edge_id=include_original_edge_id
).load()
assert dataset.dataset_name == dataset_name
graph = dataset.graph
assert isinstance(graph, gb.FusedCSCSamplingGraph)
assert graph.total_num_nodes == num_nodes
assert graph.total_num_edges == num_edges
assert graph.edge_attributes is not None
assert (
not include_original_edge_id
) or gb.ORIGINAL_EDGE_ID in graph.edge_attributes
tasks = dataset.tasks
assert len(tasks) == 1
assert isinstance(tasks[0].train_set, gb.ItemSet)
assert isinstance(tasks[0].validation_set, gb.ItemSet)
assert isinstance(tasks[0].test_set, gb.ItemSet)
assert tasks[0].metadata["num_classes"] == num_classes
assert tasks[0].metadata["name"] == "link_prediction"
assert dataset.feature.size("node", None, "feat")[0] == num_classes
assert dataset.feature.size("edge", None, "feat")[0] == num_classes
for itemset in [
tasks[0].train_set,
tasks[0].validation_set,
tasks[0].test_set,
]:
datapipe = gb.ItemSampler(itemset, batch_size=10)
datapipe = datapipe.sample_neighbor(graph, [-1])
datapipe = datapipe.fetch_feature(
dataset.feature, node_feature_keys=["feat"]
)
datapipe = datapipe.to_dgl()
dataloader = gb.MultiProcessDataLoader(datapipe)
for _ in dataloader:
pass
graph = None
tasks = None
dataset = None
@pytest.mark.parametrize("include_original_edge_id", [True, False])
def test_OnDiskDataset_heterogeneous(include_original_edge_id):
"""Preprocess and instantiate OnDiskDataset for heterogeneous graph."""
with tempfile.TemporaryDirectory() as test_dir:
dataset_name = "OnDiskDataset_hetero"
num_nodes = {
"user": 1000,
"item": 2000,
}
num_edges = {
("user", "follow", "user"): 10000,
("user", "click", "item"): 20000,
}
num_classes = 10
gbt.genereate_raw_data_for_hetero_dataset(
test_dir,
dataset_name,
num_nodes,
num_edges,
num_classes,
)
dataset = gb.OnDiskDataset(
test_dir, include_original_edge_id=include_original_edge_id
).load()
assert dataset.dataset_name == dataset_name
graph = dataset.graph
assert isinstance(graph, gb.FusedCSCSamplingGraph)
assert graph.total_num_nodes == sum(
num_nodes for num_nodes in num_nodes.values()
)
assert graph.total_num_edges == sum(
num_edge for num_edge in num_edges.values()
)
assert graph.edge_attributes is not None
assert (
not include_original_edge_id
) or gb.ORIGINAL_EDGE_ID in graph.edge_attributes
tasks = dataset.tasks
assert len(tasks) == 1
assert isinstance(tasks[0].train_set, gb.ItemSetDict)
assert isinstance(tasks[0].validation_set, gb.ItemSetDict)
assert isinstance(tasks[0].test_set, gb.ItemSetDict)
assert tasks[0].metadata["num_classes"] == num_classes
assert tasks[0].metadata["name"] == "node_classification"
assert dataset.feature.size("node", "user", "feat")[0] == num_classes
assert dataset.feature.size("node", "item", "feat")[0] == num_classes
for itemset in [
tasks[0].train_set,
tasks[0].validation_set,
tasks[0].test_set,
]:
datapipe = gb.ItemSampler(itemset, batch_size=10)
datapipe = datapipe.sample_neighbor(graph, [-1])
datapipe = datapipe.fetch_feature(
dataset.feature, node_feature_keys={"user": ["feat"]}
)
datapipe = datapipe.to_dgl()
dataloader = gb.MultiProcessDataLoader(datapipe)
for _ in dataloader:
pass
graph = None
tasks = None
dataset = None
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment