Unverified Commit 2c64ae03 authored by keli-wen's avatar keli-wen Committed by GitHub
Browse files

[Graphbolt] Enhance preprocessing: Ensure relative paths in YAML and convert...

[Graphbolt] Enhance preprocessing: Ensure relative paths in YAML and convert to absolute paths before loading. (#6110)
parent f0189a4f
......@@ -76,12 +76,13 @@ def preprocess_ondisk_dataset(dataset_dir: str) -> str:
)
# 0. Check if the dataset is already preprocessed.
if os.path.exists(os.path.join(dataset_dir, "preprocessed/metadata.yaml")):
preprocess_metadata_path = os.path.join("preprocessed", "metadata.yaml")
if os.path.exists(os.path.join(dataset_dir, preprocess_metadata_path)):
print("The dataset is already preprocessed.")
return os.path.join(dataset_dir, "preprocessed/metadata.yaml")
return os.path.join(dataset_dir, preprocess_metadata_path)
print("Start to preprocess the on-disk dataset.")
processed_dir_prefix = os.path.join(dataset_dir, "preprocessed")
processed_dir_prefix = "preprocessed"
# Check if the metadata.yaml exists.
metadata_file_path = os.path.join(dataset_dir, "metadata.yaml")
......@@ -93,7 +94,7 @@ def preprocess_ondisk_dataset(dataset_dir: str) -> str:
input_config = yaml.safe_load(f)
# 1. Make `processed_dir_abs` directory if it does not exist.
os.makedirs(processed_dir_prefix, exist_ok=True)
os.makedirs(os.path.join(dataset_dir, processed_dir_prefix), exist_ok=True)
output_config = deepcopy(input_config)
# 2. Load the edge data and create a DGLGraph.
......@@ -161,7 +162,11 @@ def preprocess_ondisk_dataset(dataset_dir: str) -> str:
)
save_csc_sampling_graph(
csc_sampling_graph, output_config["graph_topology"]["path"]
csc_sampling_graph,
os.path.join(
dataset_dir,
output_config["graph_topology"]["path"],
),
)
del output_config["graph"]
......@@ -177,16 +182,16 @@ def preprocess_ondisk_dataset(dataset_dir: str) -> str:
)
_copy_or_convert_data(
os.path.join(dataset_dir, feature["path"]),
out_feature["path"],
os.path.join(dataset_dir, out_feature["path"]),
feature["format"],
out_feature["format"],
feature["in_memory"],
)
# 7. Save tasks and train/val/test split according to the output_config.
if input_config.get("task", None):
if input_config.get("tasks", None):
for input_task, output_task in zip(
input_config["task"], output_config["task"]
input_config["tasks"], output_config["tasks"]
):
for set_name in ["train_set", "validation_set", "test_set"]:
if set_name not in input_task:
......@@ -205,13 +210,13 @@ def preprocess_ondisk_dataset(dataset_dir: str) -> str:
)
_copy_or_convert_data(
os.path.join(dataset_dir, input_data["path"]),
output_data["path"],
os.path.join(dataset_dir, output_data["path"]),
input_data["format"],
output_data["format"],
)
# 8. Save the output_config.
output_config_path = os.path.join(dataset_dir, "preprocessed/metadata.yaml")
output_config_path = os.path.join(dataset_dir, preprocess_metadata_path)
with open(output_config_path, "w") as f:
yaml.dump(output_config, f)
print("Finish preprocessing the on-disk dataset.")
......@@ -337,15 +342,39 @@ class OnDiskDataset(Dataset):
def __init__(self, path: str) -> None:
# Always call the preprocess function first. If already preprocessed,
# the function will return the original path directly.
self.dataset_dir = path
path = preprocess_ondisk_dataset(path)
with open(path) as f:
yaml_data = yaml.load(f, Loader=yaml.loader.SafeLoader)
self._meta = OnDiskMetaData(**yaml_data)
self.yaml_data = yaml.load(f, Loader=yaml.loader.SafeLoader)
self._convert_yaml_path_to_absolute_path()
self._meta = OnDiskMetaData(**self.yaml_data)
self._dataset_name = self._meta.dataset_name
self._graph = self._load_graph(self._meta.graph_topology)
self._feature = TorchBasedFeatureStore(self._meta.feature_data)
self._tasks = self._init_tasks(self._meta.tasks)
def _convert_yaml_path_to_absolute_path(self):
"""Convert the path in YAML file to absolute path."""
if "graph_topology" in self.yaml_data:
self.yaml_data["graph_topology"]["path"] = os.path.join(
self.dataset_dir, self.yaml_data["graph_topology"]["path"]
)
if "feature_data" in self.yaml_data:
for feature in self.yaml_data["feature_data"]:
feature["path"] = os.path.join(
self.dataset_dir, feature["path"]
)
if "tasks" in self.yaml_data:
for task in self.yaml_data["tasks"]:
for set_name in ["train_set", "validation_set", "test_set"]:
if set_name not in task:
continue
for set_per_type in task[set_name]:
for data in set_per_type["data"]:
data["path"] = os.path.join(
self.dataset_dir, data["path"]
)
@property
def tasks(self) -> List[Task]:
"""Return the tasks."""
......
import os
import re
import tempfile
import unittest
import gb_test_utils as gbt
......@@ -1030,7 +1031,6 @@ def test_OnDiskDataset_preprocess_path():
with tempfile.TemporaryDirectory() as test_dir:
# All metadata fields are specified.
dataset_name = "graphbolt_test"
num_classes = 10
yaml_content = f"""
dataset_name: {dataset_name}
......@@ -1063,3 +1063,311 @@ def test_OnDiskDataset_preprocess_path():
match=r"metadata.yaml does not exist.",
):
_ = gb.OnDiskDataset(fake_dir)
@unittest.skipIf(os.name == "nt", "Skip on Windows")
def test_OnDiskDataset_preprocess_yaml_content_unix():
"""Test if the preprocessed metadata.yaml is correct."""
with tempfile.TemporaryDirectory() as test_dir:
# All metadata fields are specified.
dataset_name = "graphbolt_test"
num_nodes = 4000
num_edges = 20000
num_classes = 10
# Generate random edges.
nodes = np.repeat(np.arange(num_nodes), 5)
neighbors = np.random.randint(0, num_nodes, size=(num_edges))
edges = np.stack([nodes, neighbors], axis=1)
# Wrtie into edges/edge.csv
os.makedirs(os.path.join(test_dir, "edges/"), exist_ok=True)
edges = pd.DataFrame(edges, columns=["src", "dst"])
edges.to_csv(
os.path.join(test_dir, "edges/edge.csv"),
index=False,
header=False,
)
# Generate random graph edge-feats.
edge_feats = np.random.rand(num_edges, 5)
os.makedirs(os.path.join(test_dir, "data/"), exist_ok=True)
np.save(os.path.join(test_dir, "data/edge-feat.npy"), edge_feats)
# Generate random node-feats.
node_feats = np.random.rand(num_nodes, 10)
np.save(os.path.join(test_dir, "data/node-feat.npy"), node_feats)
# Generate train/test/valid set.
os.makedirs(os.path.join(test_dir, "set/"), exist_ok=True)
train_pairs = (np.arange(1000), np.arange(1000, 2000))
train_labels = np.random.randint(0, 10, size=1000)
train_data = np.vstack([train_pairs, train_labels]).T
train_path = os.path.join(test_dir, "set/train.npy")
np.save(train_path, train_data)
validation_pairs = (np.arange(1000, 2000), np.arange(2000, 3000))
validation_labels = np.random.randint(0, 10, size=1000)
validation_data = np.vstack([validation_pairs, validation_labels]).T
validation_path = os.path.join(test_dir, "set/validation.npy")
np.save(validation_path, validation_data)
test_pairs = (np.arange(2000, 3000), np.arange(3000, 4000))
test_labels = np.random.randint(0, 10, size=1000)
test_data = np.vstack([test_pairs, test_labels]).T
test_path = os.path.join(test_dir, "set/test.npy")
np.save(test_path, test_data)
yaml_content = f"""
dataset_name: {dataset_name}
graph: # graph structure and required attributes.
nodes:
- num: {num_nodes}
edges:
- format: csv
path: edges/edge.csv
feature_data:
- domain: edge
type: null
name: feat
format: numpy
in_memory: true
path: data/edge-feat.npy
feature_data:
- domain: node
type: null
name: feat
format: numpy
in_memory: false
path: data/node-feat.npy
tasks:
- name: node_classification
num_classes: {num_classes}
train_set:
- type_name: null
data:
- format: numpy
path: set/train.npy
validation_set:
- type_name: null
data:
- format: numpy
path: set/validation.npy
test_set:
- type_name: null
data:
- format: numpy
path: set/test.npy
"""
yaml_file = os.path.join(test_dir, "metadata.yaml")
with open(yaml_file, "w") as f:
f.write(yaml_content)
preprocessed_metadata_path = gb.preprocess_ondisk_dataset(test_dir)
with open(preprocessed_metadata_path, "r") as f:
yaml_data = yaml.safe_load(f)
target_yaml_content = f"""
dataset_name: {dataset_name}
graph_topology:
type: CSCSamplingGraph
path: preprocessed/csc_sampling_graph.tar
feature_data:
- domain: node
type: null
name: feat
format: numpy
in_memory: false
path: preprocessed/data/node-feat.npy
tasks:
- name: node_classification
num_classes: {num_classes}
train_set:
- type_name: null
data:
- format: numpy
path: preprocessed/set/train.npy
validation_set:
- type_name: null
data:
- format: numpy
path: preprocessed/set/validation.npy
test_set:
- type_name: null
data:
- format: numpy
path: preprocessed/set/test.npy
"""
target_yaml_data = yaml.safe_load(target_yaml_content)
# Check yaml content.
assert (
yaml_data == target_yaml_data
), "The preprocessed metadata.yaml is not correct."
# Check file existence.
assert os.path.exists(
os.path.join(test_dir, yaml_data["graph_topology"]["path"])
)
assert os.path.exists(
os.path.join(test_dir, yaml_data["feature_data"][0]["path"])
)
for set_name in ["train_set", "validation_set", "test_set"]:
assert os.path.exists(
os.path.join(
test_dir,
yaml_data["tasks"][0][set_name][0]["data"][0]["path"],
)
)
@unittest.skipIf(os.name != "nt", "Skip on Unix")
def test_OnDiskDataset_preprocess_yaml_content_windows():
"""Test if the preprocessed metadata.yaml is correct."""
with tempfile.TemporaryDirectory() as test_dir:
# All metadata fields are specified.
dataset_name = "graphbolt_test"
num_nodes = 4000
num_edges = 20000
num_classes = 10
# Generate random edges.
nodes = np.repeat(np.arange(num_nodes), 5)
neighbors = np.random.randint(0, num_nodes, size=(num_edges))
edges = np.stack([nodes, neighbors], axis=1)
# Wrtie into edges/edge.csv
os.makedirs(os.path.join(test_dir, "edges\\"), exist_ok=True)
edges = pd.DataFrame(edges, columns=["src", "dst"])
edges.to_csv(
os.path.join(test_dir, "edges\\edge.csv"),
index=False,
header=False,
)
# Generate random graph edge-feats.
edge_feats = np.random.rand(num_edges, 5)
os.makedirs(os.path.join(test_dir, "data\\"), exist_ok=True)
np.save(os.path.join(test_dir, "data\\edge-feat.npy"), edge_feats)
# Generate random node-feats.
node_feats = np.random.rand(num_nodes, 10)
np.save(os.path.join(test_dir, "data\\node-feat.npy"), node_feats)
# Generate train/test/valid set.
os.makedirs(os.path.join(test_dir, "set\\"), exist_ok=True)
train_pairs = (np.arange(1000), np.arange(1000, 2000))
train_labels = np.random.randint(0, 10, size=1000)
train_data = np.vstack([train_pairs, train_labels]).T
train_path = os.path.join(test_dir, "set\\train.npy")
np.save(train_path, train_data)
validation_pairs = (np.arange(1000, 2000), np.arange(2000, 3000))
validation_labels = np.random.randint(0, 10, size=1000)
validation_data = np.vstack([validation_pairs, validation_labels]).T
validation_path = os.path.join(test_dir, "set\\validation.npy")
np.save(validation_path, validation_data)
test_pairs = (np.arange(2000, 3000), np.arange(3000, 4000))
test_labels = np.random.randint(0, 10, size=1000)
test_data = np.vstack([test_pairs, test_labels]).T
test_path = os.path.join(test_dir, "set\\test.npy")
np.save(test_path, test_data)
yaml_content = f"""
dataset_name: {dataset_name}
graph: # graph structure and required attributes.
nodes:
- num: {num_nodes}
edges:
- format: csv
path: edges\\edge.csv
feature_data:
- domain: edge
type: null
name: feat
format: numpy
in_memory: true
path: data\\edge-feat.npy
feature_data:
- domain: node
type: null
name: feat
format: numpy
in_memory: false
path: data\\node-feat.npy
tasks:
- name: node_classification
num_classes: {num_classes}
train_set:
- type_name: null
data:
- format: numpy
path: set\\train.npy
validation_set:
- type_name: null
data:
- format: numpy
path: set\\validation.npy
test_set:
- type_name: null
data:
- format: numpy
path: set\\test.npy
"""
yaml_file = os.path.join(test_dir, "metadata.yaml")
with open(yaml_file, "w") as f:
f.write(yaml_content)
preprocessed_metadata_path = gb.preprocess_ondisk_dataset(test_dir)
with open(preprocessed_metadata_path, "r") as f:
yaml_data = yaml.safe_load(f)
target_yaml_content = f"""
dataset_name: {dataset_name}
graph_topology:
type: CSCSamplingGraph
path: preprocessed\\csc_sampling_graph.tar
feature_data:
- domain: node
type: null
name: feat
format: numpy
in_memory: false
path: preprocessed\\data\\node-feat.npy
tasks:
- name: node_classification
num_classes: {num_classes}
train_set:
- type_name: null
data:
- format: numpy
path: preprocessed\\set\\train.npy
validation_set:
- type_name: null
data:
- format: numpy
path: preprocessed\\set\\validation.npy
test_set:
- type_name: null
data:
- format: numpy
path: preprocessed\\set\\test.npy
"""
target_yaml_data = yaml.safe_load(target_yaml_content)
# Check yaml content.
assert (
yaml_data == target_yaml_data
), "The preprocessed metadata.yaml is not correct."
# Check file existence.
assert os.path.exists(
os.path.join(test_dir, yaml_data["graph_topology"]["path"])
)
assert os.path.exists(
os.path.join(test_dir, yaml_data["feature_data"][0]["path"])
)
for set_name in ["train_set", "validation_set", "test_set"]:
assert os.path.exists(
os.path.join(
test_dir,
yaml_data["tasks"][0][set_name][0]["data"][0]["path"],
)
)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment