"git@developer.sourcefind.cn:OpenDAS/bitsandbytes.git" did not exist on "84964db93789c66fbe8b2c150fb1f9f953781137"
Unverified Commit 1a67b5b9 authored by yxy235's avatar yxy235 Committed by GitHub
Browse files

[Misc] Fix bugs of changing 1-D tasks sets when preprocess feature data. (#6479)


Co-authored-by: default avatarUbuntu <ubuntu@ip-172-31-0-133.us-west-2.compute.internal>
parent 7439b7e7
"""GraphBolt OnDiskDataset.""" """GraphBolt OnDiskDataset."""
import os import os
import shutil
from copy import deepcopy from copy import deepcopy
from typing import Dict, List, Union from typing import Dict, List, Union
...@@ -17,7 +16,7 @@ from ..base import etype_str_to_tuple ...@@ -17,7 +16,7 @@ from ..base import etype_str_to_tuple
from ..dataset import Dataset, Task from ..dataset import Dataset, Task
from ..itemset import ItemSet, ItemSetDict from ..itemset import ItemSet, ItemSetDict
from ..sampling_graph import SamplingGraph from ..sampling_graph import SamplingGraph
from ..utils import get_npy_dim, read_data, save_data from ..utils import copy_or_convert_data, read_data
from .csc_sampling_graph import ( from .csc_sampling_graph import (
CSCSamplingGraph, CSCSamplingGraph,
from_dglgraph, from_dglgraph,
...@@ -35,32 +34,6 @@ from .torch_based_feature_store import TorchBasedFeatureStore ...@@ -35,32 +34,6 @@ from .torch_based_feature_store import TorchBasedFeatureStore
__all__ = ["OnDiskDataset", "preprocess_ondisk_dataset", "BuiltinDataset"] __all__ = ["OnDiskDataset", "preprocess_ondisk_dataset", "BuiltinDataset"]
def _copy_or_convert_data(
input_path,
output_path,
input_format,
output_format="numpy",
in_memory=True,
):
"""Copy or convert the data from input_path to output_path."""
os.makedirs(os.path.dirname(output_path), exist_ok=True)
# If the original format is numpy, just copy the file.
if input_format == "numpy":
# If dim of the data is 1, reshape it to n * 1 and save it to output_path.
if get_npy_dim(input_path) == 1:
data = read_data(input_path, input_format, in_memory)
data = data.reshape(-1, 1)
save_data(data, output_path, output_format)
else:
shutil.copyfile(input_path, output_path)
else:
# If the original format is not numpy, convert it to numpy.
data = read_data(input_path, input_format, in_memory)
if data.dim() == 1:
data = data.reshape(-1, 1)
save_data(data, output_path, output_format)
def preprocess_ondisk_dataset( def preprocess_ondisk_dataset(
dataset_dir: str, include_original_edge_id: bool = False dataset_dir: str, include_original_edge_id: bool = False
) -> str: ) -> str:
...@@ -194,12 +167,13 @@ def preprocess_ondisk_dataset( ...@@ -194,12 +167,13 @@ def preprocess_ondisk_dataset(
out_feature["path"] = os.path.join( out_feature["path"] = os.path.join(
processed_dir_prefix, feature["path"].replace("pt", "npy") processed_dir_prefix, feature["path"].replace("pt", "npy")
) )
_copy_or_convert_data( copy_or_convert_data(
os.path.join(dataset_dir, feature["path"]), os.path.join(dataset_dir, feature["path"]),
os.path.join(dataset_dir, out_feature["path"]), os.path.join(dataset_dir, out_feature["path"]),
feature["format"], feature["format"],
out_feature["format"], out_feature["format"],
feature["in_memory"], feature["in_memory"],
is_feature=True,
) )
# 7. Save tasks and train/val/test split according to the output_config. # 7. Save tasks and train/val/test split according to the output_config.
...@@ -222,7 +196,7 @@ def preprocess_ondisk_dataset( ...@@ -222,7 +196,7 @@ def preprocess_ondisk_dataset(
processed_dir_prefix, processed_dir_prefix,
input_data["path"].replace("pt", "npy"), input_data["path"].replace("pt", "npy"),
) )
_copy_or_convert_data( copy_or_convert_data(
os.path.join(dataset_dir, input_data["path"]), os.path.join(dataset_dir, input_data["path"]),
os.path.join(dataset_dir, output_data["path"]), os.path.join(dataset_dir, output_data["path"]),
input_data["format"], input_data["format"],
......
"""Utility functions for GraphBolt.""" """Utility functions for GraphBolt."""
import os import os
import shutil
import numpy as np import numpy as np
import torch import torch
...@@ -78,3 +79,33 @@ def get_npy_dim(npy_path): ...@@ -78,3 +79,33 @@ def get_npy_dim(npy_path):
raise ValueError("Invalid file format") raise ValueError("Invalid file format")
return len(shape) return len(shape)
def copy_or_convert_data(
input_path,
output_path,
input_format,
output_format="numpy",
in_memory=True,
is_feature=False,
):
"""Copy or convert the data from input_path to output_path."""
assert (
output_format == "numpy"
), "The output format of the data should be numpy."
os.makedirs(os.path.dirname(output_path), exist_ok=True)
# If the original format is numpy, just copy the file.
if input_format == "numpy":
# If dim of the data is 1, reshape it to n * 1 and save it to output_path.
if is_feature and get_npy_dim(input_path) == 1:
data = read_data(input_path, input_format, in_memory)
data = data.reshape(-1, 1)
save_data(data, output_path, output_format)
else:
shutil.copyfile(input_path, output_path)
else:
# If the original format is not numpy, convert it to numpy.
data = read_data(input_path, input_format, in_memory)
if is_feature and data.dim() == 1:
data = data.reshape(-1, 1)
save_data(data, output_path, output_format)
...@@ -1851,37 +1851,106 @@ def test_OnDiskDataset_all_nodes_set_hetero(): ...@@ -1851,37 +1851,106 @@ def test_OnDiskDataset_all_nodes_set_hetero():
dataset = None dataset = None
def test_OnDiskDataset_load_1D_feature(): @pytest.mark.parametrize("fmt", ["numpy", "torch"])
def test_OnDiskDataset_load_1D_feature(fmt):
with tempfile.TemporaryDirectory() as test_dir: with tempfile.TemporaryDirectory() as test_dir:
# All metadata fields are specified. # All metadata fields are specified.
dataset_name = "graphbolt_test" dataset_name = "graphbolt_test"
num_nodes = 4000 num_nodes = 4
num_edges = 20000 num_edges = 20
num_classes = 1 num_classes = 1
# Generate random graph. type_name = "npy" if fmt == "numpy" else "pt"
yaml_content = gbt.random_homo_graphbolt_graph( # Generate random edges.
test_dir, nodes = np.repeat(np.arange(num_nodes), 5)
dataset_name, neighbors = np.random.randint(0, num_nodes, size=(num_edges))
num_nodes, edges = np.stack([nodes, neighbors], axis=1)
num_edges, # Wrtie into edges/edge.csv
num_classes, os.makedirs(os.path.join(test_dir, "edges"), exist_ok=True)
edges = pd.DataFrame(edges, columns=["src", "dst"])
edge_path = os.path.join("edges", "edge.csv")
edges.to_csv(
os.path.join(test_dir, edge_path),
index=False,
header=False,
) )
# Generate random graph edge-feats.
edge_feats = np.random.rand(num_edges, 5)
os.makedirs(os.path.join(test_dir, "data"), exist_ok=True)
edge_feat_path = os.path.join("data", f"edge-feat.{type_name}")
# Generate random 1-D node-feats.
node_feats = np.random.rand(num_nodes)
node_feat_path = os.path.join("data", f"node-feat.{type_name}")
assert node_feats.ndim == 1
# Generate 1-D train set.
os.makedirs(os.path.join(test_dir, "set"), exist_ok=True)
train_path = os.path.join("set", f"train.{type_name}")
if fmt == "numpy":
np.save(os.path.join(test_dir, edge_feat_path), edge_feats)
np.save(os.path.join(test_dir, node_feat_path), node_feats)
np.save(os.path.join(test_dir, train_path), np.array([0, 1, 0]))
else:
torch.save(
torch.from_numpy(edge_feats),
os.path.join(test_dir, edge_feat_path),
)
torch.save(
torch.from_numpy(node_feats),
os.path.join(test_dir, node_feat_path),
)
torch.save(
torch.tensor([0, 1, 0]), os.path.join(test_dir, train_path)
)
yaml_content = f"""
dataset_name: {dataset_name}
graph: # graph structure and required attributes.
nodes:
- num: {num_nodes}
edges:
- format: csv
path: {edge_path}
feature_data:
- domain: edge
type: null
name: feat
format: {fmt}
in_memory: true
path: {edge_feat_path}
feature_data:
- domain: node
type: null
name: feat
format: {fmt}
in_memory: false
path: {node_feat_path}
tasks:
- name: node_classification
num_classes: {num_classes}
train_set:
- type_name: null
data:
- format: {fmt}
path: {train_path}
"""
yaml_file = os.path.join(test_dir, "metadata.yaml") yaml_file = os.path.join(test_dir, "metadata.yaml")
with open(yaml_file, "w") as f: with open(yaml_file, "w") as f:
f.write(yaml_content) f.write(yaml_content)
with open(yaml_file, "r") as f:
input_config = yaml.safe_load(f)
node_feat = np.load(
os.path.join(test_dir, input_config["feature_data"][0]["path"])
)
dataset = gb.OnDiskDataset(test_dir).load() dataset = gb.OnDiskDataset(test_dir).load()
feature = dataset.feature.read("node", None, "feat") feature = dataset.feature.read("node", None, "feat")
assert torch.equal(torch.from_numpy(node_feat.reshape(-1, 1)), feature) # Test whether feature has changed.
assert torch.equal(torch.from_numpy(node_feats.reshape(-1, 1)), feature)
# Test whether itemsets keep same.
assert torch.equal(
dataset.tasks[0].train_set._items[0], torch.tensor([0, 1, 0])
)
dataset = None dataset = None
node_feat = None node_feats = None
feature = None feature = None
......
...@@ -97,3 +97,47 @@ def test_get_npy_dim(fmt): ...@@ -97,3 +97,47 @@ def test_get_npy_dim(fmt):
with pytest.raises(ValueError): with pytest.raises(ValueError):
utils.get_npy_dim(file_name) utils.get_npy_dim(file_name)
data = None data = None
@pytest.mark.parametrize("data_fmt", ["numpy", "torch"])
@pytest.mark.parametrize("save_fmt", ["numpy", "torch"])
@pytest.mark.parametrize("is_feature", [True, False])
def test_copy_or_convert_data(data_fmt, save_fmt, is_feature):
with tempfile.TemporaryDirectory() as test_dir:
data = np.arange(10)
tensor_data = torch.from_numpy(data)
in_type_name = "npy" if data_fmt == "numpy" else "pt"
input_path = os.path.join(test_dir, f"data.{in_type_name}")
out_type_name = "npy" if save_fmt == "numpy" else "pt"
output_path = os.path.join(test_dir, f"out_data.{out_type_name}")
if data_fmt == "numpy":
np.save(input_path, data)
else:
torch.save(tensor_data, input_path)
if save_fmt == "torch":
with pytest.raises(AssertionError):
utils.copy_or_convert_data(
input_path,
output_path,
data_fmt,
save_fmt,
is_feature=is_feature,
)
else:
utils.copy_or_convert_data(
input_path,
output_path,
data_fmt,
save_fmt,
is_feature=is_feature,
)
if is_feature:
data = data.reshape(-1, 1)
tensor_data = tensor_data.reshape(-1, 1)
if save_fmt == "numpy":
out_data = np.load(output_path)
assert (data == out_data).all()
data = None
tensor_data = None
out_data = None
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment