Unverified Commit 40816f6e authored by yxy235's avatar yxy235 Committed by GitHub
Browse files

[GraphBolt] Automatically force preprocess on-disk dataset. (#6937)


Co-authored-by: default avatarUbuntu <ubuntu@ip-172-31-0-133.us-west-2.compute.internal>
parent 9a8aa8fa
"""GraphBolt OnDiskDataset.""" """GraphBolt OnDiskDataset."""
import json
import os import os
import shutil import shutil
from copy import deepcopy from copy import deepcopy
...@@ -15,6 +16,8 @@ from ...data.utils import download, extract_archive ...@@ -15,6 +16,8 @@ from ...data.utils import download, extract_archive
from ..base import etype_str_to_tuple from ..base import etype_str_to_tuple
from ..dataset import Dataset, Task from ..dataset import Dataset, Task
from ..internal import ( from ..internal import (
calculate_dir_hash,
check_dataset_change,
copy_or_convert_data, copy_or_convert_data,
get_attributes, get_attributes,
read_data, read_data,
...@@ -37,7 +40,7 @@ __all__ = ["OnDiskDataset", "preprocess_ondisk_dataset", "BuiltinDataset"] ...@@ -37,7 +40,7 @@ __all__ = ["OnDiskDataset", "preprocess_ondisk_dataset", "BuiltinDataset"]
def preprocess_ondisk_dataset( def preprocess_ondisk_dataset(
dataset_dir: str, dataset_dir: str,
include_original_edge_id: bool = False, include_original_edge_id: bool = False,
force_preprocess: bool = False, force_preprocess: bool = None,
) -> str: ) -> str:
"""Preprocess the on-disk dataset. Parse the input config file, """Preprocess the on-disk dataset. Parse the input config file,
load the data, and save the data in the format that GraphBolt supports. load the data, and save the data in the format that GraphBolt supports.
...@@ -72,6 +75,20 @@ def preprocess_ondisk_dataset( ...@@ -72,6 +75,20 @@ def preprocess_ondisk_dataset(
processed_dir_prefix, "metadata.yaml" processed_dir_prefix, "metadata.yaml"
) )
if os.path.exists(os.path.join(dataset_dir, preprocess_metadata_path)): if os.path.exists(os.path.join(dataset_dir, preprocess_metadata_path)):
if force_preprocess is None:
with open(
os.path.join(dataset_dir, preprocess_metadata_path), "r"
) as f:
preprocess_config = yaml.safe_load(f)
if (
preprocess_config.get("include_original_edge_id", None)
== include_original_edge_id
):
force_preprocess = check_dataset_change(
dataset_dir, processed_dir_prefix
)
else:
force_preprocess = True
if force_preprocess: if force_preprocess:
shutil.rmtree(os.path.join(dataset_dir, processed_dir_prefix)) shutil.rmtree(os.path.join(dataset_dir, processed_dir_prefix))
print( print(
...@@ -180,7 +197,10 @@ def preprocess_ondisk_dataset( ...@@ -180,7 +197,10 @@ def preprocess_ondisk_dataset(
g, is_homogeneous, include_original_edge_id g, is_homogeneous, include_original_edge_id
) )
# 5. Save the FusedCSCSamplingGraph and modify the output_config. # 5. Record value of include_original_edge_id.
output_config["include_original_edge_id"] = include_original_edge_id
# 6. Save the FusedCSCSamplingGraph and modify the output_config.
output_config["graph_topology"] = {} output_config["graph_topology"] = {}
output_config["graph_topology"]["type"] = "FusedCSCSamplingGraph" output_config["graph_topology"]["type"] = "FusedCSCSamplingGraph"
output_config["graph_topology"]["path"] = os.path.join( output_config["graph_topology"]["path"] = os.path.join(
...@@ -196,7 +216,7 @@ def preprocess_ondisk_dataset( ...@@ -196,7 +216,7 @@ def preprocess_ondisk_dataset(
) )
del output_config["graph"] del output_config["graph"]
# 6. Load the node/edge features and do necessary conversion. # 7. Load the node/edge features and do necessary conversion.
if input_config.get("feature_data", None): if input_config.get("feature_data", None):
for feature, out_feature in zip( for feature, out_feature in zip(
input_config["feature_data"], output_config["feature_data"] input_config["feature_data"], output_config["feature_data"]
...@@ -218,7 +238,7 @@ def preprocess_ondisk_dataset( ...@@ -218,7 +238,7 @@ def preprocess_ondisk_dataset(
is_feature=True, is_feature=True,
) )
# 7. Save tasks and train/val/test split according to the output_config. # 8. Save tasks and train/val/test split according to the output_config.
if input_config.get("tasks", None): if input_config.get("tasks", None):
for input_task, output_task in zip( for input_task, output_task in zip(
input_config["tasks"], output_config["tasks"] input_config["tasks"], output_config["tasks"]
...@@ -245,13 +265,24 @@ def preprocess_ondisk_dataset( ...@@ -245,13 +265,24 @@ def preprocess_ondisk_dataset(
output_data["format"], output_data["format"],
) )
# 8. Save the output_config. # 9. Save the output_config.
output_config_path = os.path.join(dataset_dir, preprocess_metadata_path) output_config_path = os.path.join(dataset_dir, preprocess_metadata_path)
with open(output_config_path, "w") as f: with open(output_config_path, "w") as f:
yaml.dump(output_config, f) yaml.dump(output_config, f)
print("Finish preprocessing the on-disk dataset.") print("Finish preprocessing the on-disk dataset.")
# 9. Return the absolute path of the preprocessing yaml file. # 10. Calculate and save the hash value of the dataset directory.
hash_value_file = "dataset_hash_value.txt"
hash_value_file_path = os.path.join(
dataset_dir, processed_dir_prefix, hash_value_file
)
if os.path.exists(hash_value_file_path):
os.remove(hash_value_file_path)
dir_hash = calculate_dir_hash(dataset_dir)
with open(hash_value_file_path, "w") as f:
f.write(json.dumps(dir_hash, indent=4))
# 11. Return the absolute path of the preprocessing yaml file.
return output_config_path return output_config_path
...@@ -398,7 +429,7 @@ class OnDiskDataset(Dataset): ...@@ -398,7 +429,7 @@ class OnDiskDataset(Dataset):
self, self,
path: str, path: str,
include_original_edge_id: bool = False, include_original_edge_id: bool = False,
force_preprocess: bool = False, force_preprocess: bool = None,
) -> None: ) -> None:
# Always call the preprocess function first. If already preprocessed, # Always call the preprocess function first. If already preprocessed,
# the function will return the original path directly. # the function will return the original path directly.
...@@ -720,7 +751,7 @@ class BuiltinDataset(OnDiskDataset): ...@@ -720,7 +751,7 @@ class BuiltinDataset(OnDiskDataset):
download(url, path=zip_file_path) download(url, path=zip_file_path)
extract_archive(zip_file_path, root, overwrite=True) extract_archive(zip_file_path, root, overwrite=True)
os.remove(zip_file_path) os.remove(zip_file_path)
super().__init__(dataset_dir) super().__init__(dataset_dir, force_preprocess=False)
def _ondisk_task_str(task: OnDiskTask) -> str: def _ondisk_task_str(task: OnDiskTask) -> str:
......
"""Utility functions for GraphBolt.""" """Utility functions for GraphBolt."""
import hashlib
import json
import os import os
import shutil import shutil
from typing import List, Union
import numpy as np import numpy as np
import pandas as pd import pandas as pd
...@@ -145,3 +148,51 @@ def read_edges(dataset_dir, edge_fmt, edge_path): ...@@ -145,3 +148,51 @@ def read_edges(dataset_dir, edge_fmt, edge_path):
) )
src, dst = edge_data["src"].to_numpy(), edge_data["dst"].to_numpy() src, dst = edge_data["src"].to_numpy(), edge_data["dst"].to_numpy()
return (src, dst) return (src, dst)
def calculate_file_hash(file_path, hash_algo="md5"):
"""Calculate the hash value of a file."""
hash_algos = ["md5", "sha1", "sha224", "sha256", "sha384", "sha512"]
if hash_algo in hash_algos:
hash_obj = getattr(hashlib, hash_algo)()
else:
raise ValueError(
f"Hash algorithm must be one of: {hash_algos}, but got `{hash_algo}`."
)
with open(file_path, "rb") as file:
for chunk in iter(lambda: file.read(4096), b""):
hash_obj.update(chunk)
return hash_obj.hexdigest()
def calculate_dir_hash(
dir_path, hash_algo="md5", ignore: Union[str, List[str]] = None
):
"""Calculte the hash values of all files under the directory."""
hashes = {}
for dirpath, _, filenames in os.walk(dir_path):
for filename in filenames:
if ignore and filename in ignore:
continue
filepath = os.path.join(dirpath, filename)
file_hash = calculate_file_hash(filepath, hash_algo=hash_algo)
hashes[filepath] = file_hash
return hashes
def check_dataset_change(dataset_dir, processed_dir):
"""Check whether dataset has been changed by checking its hash value."""
hash_value_file = "dataset_hash_value.txt"
hash_value_file_path = os.path.join(
dataset_dir, processed_dir, hash_value_file
)
if not os.path.exists(hash_value_file_path):
return True
with open(hash_value_file_path, "r") as f:
oringinal_hash_value = json.load(f)
present_hash_value = calculate_dir_hash(dataset_dir, ignore=hash_value_file)
if oringinal_hash_value == present_hash_value:
force_preprocess = False
else:
force_preprocess = True
return force_preprocess
...@@ -32,9 +32,11 @@ def load_dataset(dataset): ...@@ -32,9 +32,11 @@ def load_dataset(dataset):
return dataset.load() return dataset.load()
def write_yaml_and_load_dataset(yaml_content, dir): def write_yaml_and_load_dataset(yaml_content, dir, force_preprocess=False):
write_yaml_file(yaml_content, dir) write_yaml_file(yaml_content, dir)
return load_dataset(gb.OnDiskDataset(dir)) return load_dataset(
gb.OnDiskDataset(dir, force_preprocess=force_preprocess)
)
def test_OnDiskDataset_TVTSet_exceptions(): def test_OnDiskDataset_TVTSet_exceptions():
...@@ -52,7 +54,7 @@ def test_OnDiskDataset_TVTSet_exceptions(): ...@@ -52,7 +54,7 @@ def test_OnDiskDataset_TVTSet_exceptions():
""" """
write_yaml_file(yaml_content, test_dir) write_yaml_file(yaml_content, test_dir)
with pytest.raises(pydantic.ValidationError): with pytest.raises(pydantic.ValidationError):
_ = gb.OnDiskDataset(test_dir).load() _ = gb.OnDiskDataset(test_dir, force_preprocess=False).load()
# Case 2: ``type`` is not specified while multiple TVT sets are # Case 2: ``type`` is not specified while multiple TVT sets are
# specified. # specified.
...@@ -74,7 +76,7 @@ def test_OnDiskDataset_TVTSet_exceptions(): ...@@ -74,7 +76,7 @@ def test_OnDiskDataset_TVTSet_exceptions():
AssertionError, AssertionError,
match=r"Only one TVT set is allowed if type is not specified.", match=r"Only one TVT set is allowed if type is not specified.",
): ):
_ = gb.OnDiskDataset(test_dir).load() _ = gb.OnDiskDataset(test_dir, force_preprocess=False).load()
def test_OnDiskDataset_multiple_tasks(): def test_OnDiskDataset_multiple_tasks():
...@@ -1001,7 +1003,7 @@ def test_OnDiskDataset_Graph_Exceptions(): ...@@ -1001,7 +1003,7 @@ def test_OnDiskDataset_Graph_Exceptions():
pydantic.ValidationError, pydantic.ValidationError,
match="1 validation error for OnDiskMetaData", match="1 validation error for OnDiskMetaData",
): ):
_ = gb.OnDiskDataset(test_dir).load() _ = gb.OnDiskDataset(test_dir, force_preprocess=False).load()
def test_OnDiskDataset_Graph_homogeneous(): def test_OnDiskDataset_Graph_homogeneous():
...@@ -1359,6 +1361,7 @@ def test_OnDiskDataset_preprocess_yaml_content_unix(): ...@@ -1359,6 +1361,7 @@ def test_OnDiskDataset_preprocess_yaml_content_unix():
data: data:
- format: numpy - format: numpy
path: preprocessed/set/test.npy path: preprocessed/set/test.npy
include_original_edge_id: False
""" """
target_yaml_data = yaml.safe_load(target_yaml_content) target_yaml_data = yaml.safe_load(target_yaml_content)
# Check yaml content. # Check yaml content.
...@@ -1513,6 +1516,7 @@ def test_OnDiskDataset_preprocess_yaml_content_windows(): ...@@ -1513,6 +1516,7 @@ def test_OnDiskDataset_preprocess_yaml_content_windows():
data: data:
- format: numpy - format: numpy
path: preprocessed\\set\\test.npy path: preprocessed\\set\\test.npy
include_original_edge_id: False
""" """
target_yaml_data = yaml.safe_load(target_yaml_content) target_yaml_data = yaml.safe_load(target_yaml_content)
# Check yaml content. # Check yaml content.
...@@ -1609,6 +1613,119 @@ def test_OnDiskDataset_preprocess_force_preprocess(capsys): ...@@ -1609,6 +1613,119 @@ def test_OnDiskDataset_preprocess_force_preprocess(capsys):
assert target_yaml_data["tasks"][0]["name"] == "fake_name" assert target_yaml_data["tasks"][0]["name"] == "fake_name"
def test_OnDiskDataset_preprocess_auto_force_preprocess(capsys):
"""Test force preprocess of OnDiskDataset."""
with tempfile.TemporaryDirectory() as test_dir:
# All metadata fields are specified.
dataset_name = "graphbolt_test"
num_nodes = 4000
num_edges = 20000
num_classes = 10
# Generate random graph.
yaml_content = gbt.random_homo_graphbolt_graph(
test_dir,
dataset_name,
num_nodes,
num_edges,
num_classes,
)
yaml_file = os.path.join(test_dir, "metadata.yaml")
with open(yaml_file, "w") as f:
f.write(yaml_content)
# First preprocess on-disk dataset.
preprocessed_metadata_path = (
gb.ondisk_dataset.preprocess_ondisk_dataset(
test_dir, include_original_edge_id=False
)
)
captured = capsys.readouterr().out.split("\n")
assert captured == [
"Start to preprocess the on-disk dataset.",
"Finish preprocessing the on-disk dataset.",
"",
]
with open(preprocessed_metadata_path, "r") as f:
target_yaml_data = yaml.safe_load(f)
assert target_yaml_data["tasks"][0]["name"] == "link_prediction"
# 1. Change yaml_data.
with open(yaml_file, "r") as f:
yaml_data = yaml.safe_load(f)
yaml_data["tasks"][0]["name"] = "fake_name"
with open(yaml_file, "w") as f:
yaml.dump(yaml_data, f)
preprocessed_metadata_path = (
gb.ondisk_dataset.preprocess_ondisk_dataset(
test_dir, include_original_edge_id=False
)
)
captured = capsys.readouterr().out.split("\n")
assert captured == [
"The on-disk dataset is re-preprocessing, so the existing "
+ "preprocessed dataset has been removed.",
"Start to preprocess the on-disk dataset.",
"Finish preprocessing the on-disk dataset.",
"",
]
with open(preprocessed_metadata_path, "r") as f:
target_yaml_data = yaml.safe_load(f)
assert target_yaml_data["tasks"][0]["name"] == "fake_name"
# 2. Change edge feature.
edge_feats = np.random.rand(num_edges, num_classes)
edge_feat_path = os.path.join("data", "edge-feat.npy")
np.save(os.path.join(test_dir, edge_feat_path), edge_feats)
preprocessed_metadata_path = (
gb.ondisk_dataset.preprocess_ondisk_dataset(
test_dir, include_original_edge_id=False
)
)
captured = capsys.readouterr().out.split("\n")
assert captured == [
"The on-disk dataset is re-preprocessing, so the existing "
+ "preprocessed dataset has been removed.",
"Start to preprocess the on-disk dataset.",
"Finish preprocessing the on-disk dataset.",
"",
]
preprocessed_edge_feat = np.load(
os.path.join(test_dir, "preprocessed", edge_feat_path)
)
assert preprocessed_edge_feat.all() == edge_feats.all()
with open(preprocessed_metadata_path, "r") as f:
target_yaml_data = yaml.safe_load(f)
assert target_yaml_data["include_original_edge_id"] == False
# 3. Change include_original_edge_id.
preprocessed_metadata_path = (
gb.ondisk_dataset.preprocess_ondisk_dataset(
test_dir, include_original_edge_id=True
)
)
captured = capsys.readouterr().out.split("\n")
assert captured == [
"The on-disk dataset is re-preprocessing, so the existing "
+ "preprocessed dataset has been removed.",
"Start to preprocess the on-disk dataset.",
"Finish preprocessing the on-disk dataset.",
"",
]
with open(preprocessed_metadata_path, "r") as f:
target_yaml_data = yaml.safe_load(f)
assert target_yaml_data["include_original_edge_id"] == True
# 4. Change nothing.
preprocessed_metadata_path = (
gb.ondisk_dataset.preprocess_ondisk_dataset(
test_dir, include_original_edge_id=True
)
)
captured = capsys.readouterr().out.split("\n")
assert captured == ["The dataset is already preprocessed.", ""]
@pytest.mark.parametrize("edge_fmt", ["csv", "numpy"]) @pytest.mark.parametrize("edge_fmt", ["csv", "numpy"])
def test_OnDiskDataset_load_name(edge_fmt): def test_OnDiskDataset_load_name(edge_fmt):
"""Test preprocess of OnDiskDataset.""" """Test preprocess of OnDiskDataset."""
...@@ -2341,6 +2458,109 @@ def test_OnDiskDataset_force_preprocess(capsys): ...@@ -2341,6 +2458,109 @@ def test_OnDiskDataset_force_preprocess(capsys):
dataset = None dataset = None
def test_OnDiskDataset_auto_force_preprocess(capsys):
"""Test force preprocess of OnDiskDataset."""
with tempfile.TemporaryDirectory() as test_dir:
# All metadata fields are specified.
dataset_name = "graphbolt_test"
num_nodes = 4000
num_edges = 20000
num_classes = 10
# Generate random graph.
yaml_content = gbt.random_homo_graphbolt_graph(
test_dir,
dataset_name,
num_nodes,
num_edges,
num_classes,
)
yaml_file = os.path.join(test_dir, "metadata.yaml")
with open(yaml_file, "w") as f:
f.write(yaml_content)
# First preprocess on-disk dataset.
dataset = gb.OnDiskDataset(
test_dir, include_original_edge_id=False
).load()
captured = capsys.readouterr().out.split("\n")
assert captured == [
"Start to preprocess the on-disk dataset.",
"Finish preprocessing the on-disk dataset.",
"",
]
tasks = dataset.tasks
assert tasks[0].metadata["name"] == "link_prediction"
# 1. Change yaml_data.
with open(yaml_file, "r") as f:
yaml_data = yaml.safe_load(f)
yaml_data["tasks"][0]["name"] = "fake_name"
with open(yaml_file, "w") as f:
yaml.dump(yaml_data, f)
dataset = gb.OnDiskDataset(
test_dir, include_original_edge_id=False
).load()
captured = capsys.readouterr().out.split("\n")
assert captured == [
"The on-disk dataset is re-preprocessing, so the existing "
+ "preprocessed dataset has been removed.",
"Start to preprocess the on-disk dataset.",
"Finish preprocessing the on-disk dataset.",
"",
]
tasks = dataset.tasks
assert tasks[0].metadata["name"] == "fake_name"
# 2. Change edge feature.
edge_feats = np.random.rand(num_edges, num_classes)
edge_feat_path = os.path.join("data", "edge-feat.npy")
np.save(os.path.join(test_dir, edge_feat_path), edge_feats)
dataset = gb.OnDiskDataset(
test_dir, include_original_edge_id=False
).load()
captured = capsys.readouterr().out.split("\n")
assert captured == [
"The on-disk dataset is re-preprocessing, so the existing "
+ "preprocessed dataset has been removed.",
"Start to preprocess the on-disk dataset.",
"Finish preprocessing the on-disk dataset.",
"",
]
assert torch.equal(
dataset.feature.read("edge", None, "feat"),
torch.from_numpy(edge_feats),
)
graph = dataset.graph
assert gb.ORIGINAL_EDGE_ID not in graph.edge_attributes
# 3. Change include_original_edge_id.
dataset = gb.OnDiskDataset(
test_dir, include_original_edge_id=True
).load()
captured = capsys.readouterr().out.split("\n")
assert captured == [
"The on-disk dataset is re-preprocessing, so the existing "
+ "preprocessed dataset has been removed.",
"Start to preprocess the on-disk dataset.",
"Finish preprocessing the on-disk dataset.",
"",
]
graph = dataset.graph
assert gb.ORIGINAL_EDGE_ID in graph.edge_attributes
# 4. Change Nothing.
dataset = gb.OnDiskDataset(
test_dir, include_original_edge_id=True
).load()
captured = capsys.readouterr().out.split("\n")
assert captured == ["The dataset is already preprocessed.", ""]
graph = None
tasks = None
dataset = None
def test_OnDiskTask_repr_homogeneous(): def test_OnDiskTask_repr_homogeneous():
item_set = gb.ItemSet( item_set = gb.ItemSet(
(torch.arange(0, 5), torch.arange(5, 10)), (torch.arange(0, 5), torch.arange(5, 10)),
......
import json
import os import os
import re import re
import tempfile import tempfile
...@@ -200,3 +201,68 @@ def test_read_edges_error(): ...@@ -200,3 +201,68 @@ def test_read_edges_error():
), ),
): ):
internal.read_edges(test_dir, "numpy", edge_path) internal.read_edges(test_dir, "numpy", edge_path)
def test_calculate_file_hash():
with tempfile.TemporaryDirectory() as test_dir:
test_file_path = os.path.join(test_dir, "test.txt")
with open(test_file_path, "w") as file:
file.write("test content")
hash_value = internal.calculate_file_hash(
test_file_path, hash_algo="md5"
)
expected_hash_value = "9473fdd0d880a43c21b7778d34872157"
assert expected_hash_value == hash_value
with pytest.raises(
ValueError,
match=re.escape(
"Hash algorithm must be one of: ['md5', 'sha1', 'sha224', "
+ "'sha256', 'sha384', 'sha512'], but got `fake`."
),
):
hash_value = internal.calculate_file_hash(
test_file_path, hash_algo="fake"
)
def test_calculate_dir_hash():
with tempfile.TemporaryDirectory() as test_dir:
test_file_path_1 = os.path.join(test_dir, "test_1.txt")
test_file_path_2 = os.path.join(test_dir, "test_2.txt")
with open(test_file_path_1, "w") as file:
file.write("test content")
with open(test_file_path_2, "w") as file:
file.write("test contents of directory")
hash_value = internal.calculate_dir_hash(test_dir, hash_algo="md5")
expected_hash_value = [
"56e708a2bdf92887d4a7f25cbc13c555",
"9473fdd0d880a43c21b7778d34872157",
]
assert len(hash_value) == 2
for val in hash_value.values():
assert val in expected_hash_value
def test_check_dataset_change():
with tempfile.TemporaryDirectory() as test_dir:
# Generate directory and record its hash value.
test_file_path_1 = os.path.join(test_dir, "test_1.txt")
test_file_path_2 = os.path.join(test_dir, "test_2.txt")
with open(test_file_path_1, "w") as file:
file.write("test content")
with open(test_file_path_2, "w") as file:
file.write("test contents of directory")
hash_value = internal.calculate_dir_hash(test_dir, hash_algo="md5")
hash_value_file = "dataset_hash_value.txt"
hash_value_file_paht = os.path.join(
test_dir, "preprocessed", hash_value_file
)
os.makedirs(os.path.join(test_dir, "preprocessed"), exist_ok=True)
with open(hash_value_file_paht, "w") as file:
file.write(json.dumps(hash_value, indent=4))
# Modify the content of a file.
with open(test_file_path_2, "w") as file:
file.write("test contents of directory changed")
assert internal.check_dataset_change(test_dir, "preprocessed")
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment