"src/git@developer.sourcefind.cn:OpenDAS/dgl.git" did not exist on "2f41fcd986a26661090e5eff6bafe212e354a690"
Unverified Commit e5ddc62b authored by Rhett Ying's avatar Rhett Ying Committed by GitHub
Browse files

[GraphBolt] add support to generate TVT in ItemSet or ItemSetDict format (#5958)

parent ca36441b
...@@ -7,6 +7,7 @@ import pydantic_yaml ...@@ -7,6 +7,7 @@ import pydantic_yaml
from .feature_store import FeatureStore from .feature_store import FeatureStore
from .itemset import ItemSet, ItemSetDict from .itemset import ItemSet, ItemSetDict
from .utils import read_data, tensor_to_tuple
__all__ = ["Dataset", "OnDiskDataset"] __all__ = ["Dataset", "OnDiskDataset"]
...@@ -34,16 +35,16 @@ class Dataset: ...@@ -34,16 +35,16 @@ class Dataset:
generate a subgraph. generate a subgraph.
""" """
def train_set(self) -> ItemSet or ItemSetDict: def train_sets(self) -> List[ItemSet] or List[ItemSetDict]:
"""Return the training set.""" """Return the training sets."""
raise NotImplementedError raise NotImplementedError
def validation_set(self) -> ItemSet or ItemSetDict: def validation_sets(self) -> List[ItemSet] or List[ItemSetDict]:
"""Return the validation set.""" """Return the validation sets."""
raise NotImplementedError raise NotImplementedError
def test_set(self) -> ItemSet or ItemSetDict: def test_sets(self) -> List[ItemSet] or List[ItemSetDict]:
"""Return the test set.""" """Return the test sets."""
raise NotImplementedError raise NotImplementedError
def graph(self) -> object: def graph(self) -> object:
...@@ -65,8 +66,9 @@ class OnDiskDataFormatEnum(pydantic_yaml.YamlStrEnum): ...@@ -65,8 +66,9 @@ class OnDiskDataFormatEnum(pydantic_yaml.YamlStrEnum):
class OnDiskTVTSet(pydantic.BaseModel): class OnDiskTVTSet(pydantic.BaseModel):
"""Train-Validation-Test set.""" """Train-Validation-Test set."""
type_name: str type_name: Optional[str]
format: OnDiskDataFormatEnum format: OnDiskDataFormatEnum
in_memory: Optional[bool] = True
path: str path: str
...@@ -77,9 +79,9 @@ class OnDiskMetaData(pydantic_yaml.YamlModel): ...@@ -77,9 +79,9 @@ class OnDiskMetaData(pydantic_yaml.YamlModel):
is a list of list of ``OnDiskTVTSet``. is a list of list of ``OnDiskTVTSet``.
""" """
train_set: Optional[List[List[OnDiskTVTSet]]] train_sets: Optional[List[List[OnDiskTVTSet]]]
validation_set: Optional[List[List[OnDiskTVTSet]]] validation_sets: Optional[List[List[OnDiskTVTSet]]]
test_set: Optional[List[List[OnDiskTVTSet]]] test_sets: Optional[List[List[OnDiskTVTSet]]]
class OnDiskDataset(Dataset): class OnDiskDataset(Dataset):
...@@ -95,17 +97,20 @@ class OnDiskDataset(Dataset): ...@@ -95,17 +97,20 @@ class OnDiskDataset(Dataset):
.. code-block:: yaml .. code-block:: yaml
train_set: train_sets:
- - type_name: paper - - type_name: paper # could be null for homogeneous graph.
format: numpy format: numpy
in_memory: true # If not specified, default to true.
path: set/paper-train.npy path: set/paper-train.npy
validation_set: validation_sets:
- - type_name: paper - - type_name: paper
format: numpy format: numpy
in_memory: true
path: set/paper-validation.npy path: set/paper-validation.npy
test_set: test_sets:
- - type_name: paper - - type_name: paper
format: numpy format: numpy
in_memory: true
path: set/paper-test.npy path: set/paper-test.npy
Parameters Parameters
...@@ -117,18 +122,21 @@ class OnDiskDataset(Dataset): ...@@ -117,18 +122,21 @@ class OnDiskDataset(Dataset):
def __init__(self, path: str) -> None: def __init__(self, path: str) -> None:
with open(path, "r") as f: with open(path, "r") as f:
self._meta = OnDiskMetaData.parse_raw(f.read(), proto="yaml") self._meta = OnDiskMetaData.parse_raw(f.read(), proto="yaml")
self._train_sets = self._init_tvt_sets(self._meta.train_sets)
self._validation_sets = self._init_tvt_sets(self._meta.validation_sets)
self._test_sets = self._init_tvt_sets(self._meta.test_sets)
def train_set(self) -> ItemSet or ItemSetDict: def train_sets(self) -> List[ItemSet] or List[ItemSetDict]:
"""Return the training set.""" """Return the training set."""
raise NotImplementedError return self._train_sets
def validation_set(self) -> ItemSet or ItemSetDict: def validation_sets(self) -> List[ItemSet] or List[ItemSetDict]:
"""Return the validation set.""" """Return the validation set."""
raise NotImplementedError return self._validation_sets
def test_set(self) -> ItemSet or ItemSetDict: def test_sets(self) -> List[ItemSet] or List[ItemSetDict]:
"""Return the test set.""" """Return the test set."""
raise NotImplementedError return self._test_sets
def graph(self) -> object: def graph(self) -> object:
"""Return the graph.""" """Return the graph."""
...@@ -137,3 +145,32 @@ class OnDiskDataset(Dataset): ...@@ -137,3 +145,32 @@ class OnDiskDataset(Dataset):
def feature(self) -> FeatureStore: def feature(self) -> FeatureStore:
"""Return the feature.""" """Return the feature."""
raise NotImplementedError raise NotImplementedError
def _init_tvt_sets(
self, tvt_sets: List[List[OnDiskTVTSet]]
) -> List[ItemSet] or List[ItemSetDict]:
"""Initialize the TVT sets."""
if (tvt_sets is None) or (len(tvt_sets) == 0):
return None
ret = []
for tvt_set in tvt_sets:
if (tvt_set is None) or (len(tvt_set) == 0):
ret.append(None)
if tvt_set[0].type_name is None:
assert (
len(tvt_set) == 1
), "Only one TVT set is allowed if type_name is not specified."
data = read_data(
tvt_set[0].path, tvt_set[0].format, tvt_set[0].in_memory
)
ret.append(ItemSet(tensor_to_tuple(data)))
else:
data = {}
for tvt in tvt_set:
data[tvt.type_name] = ItemSet(
tensor_to_tuple(
read_data(tvt.path, tvt.format, tvt.in_memory)
)
)
ret.append(ItemSetDict(data))
return ret
"""Utility functions for GraphBolt."""
import numpy as np
import torch
def _read_torch_data(path):
return torch.load(path)
def _read_numpy_data(path, in_memory=True):
if in_memory:
return torch.from_numpy(np.load(path))
return torch.as_tensor(np.load(path, mmap_mode="r+"))
def read_data(path, fmt, in_memory=True):
"""Read data from disk."""
if fmt == "torch":
return _read_torch_data(path)
elif fmt == "numpy":
return _read_numpy_data(path, in_memory=in_memory)
else:
raise RuntimeError(f"Unsupported format: {fmt}")
def tensor_to_tuple(data):
"""Split a torch.Tensor in column-wise to a tuple."""
assert isinstance(data, torch.Tensor), "data must be a torch.Tensor"
return tuple(data.t())
import os import os
import tempfile import tempfile
import numpy as np
import pydantic import pydantic
import pytest import pytest
from dgl import graphbolt as gb from dgl import graphbolt as gb
...@@ -9,45 +11,452 @@ from dgl import graphbolt as gb ...@@ -9,45 +11,452 @@ from dgl import graphbolt as gb
def test_Dataset(): def test_Dataset():
dataset = gb.Dataset() dataset = gb.Dataset()
with pytest.raises(NotImplementedError): with pytest.raises(NotImplementedError):
_ = dataset.train_set() _ = dataset.train_sets()
with pytest.raises(NotImplementedError): with pytest.raises(NotImplementedError):
_ = dataset.validation_set() _ = dataset.validation_sets()
with pytest.raises(NotImplementedError): with pytest.raises(NotImplementedError):
_ = dataset.test_set() _ = dataset.test_sets()
with pytest.raises(NotImplementedError): with pytest.raises(NotImplementedError):
_ = dataset.graph() _ = dataset.graph()
with pytest.raises(NotImplementedError): with pytest.raises(NotImplementedError):
_ = dataset.feature() _ = dataset.feature()
def test_OnDiskDataset_TVTSet(): def test_OnDiskDataset_TVTSet_exceptions():
"""Test OnDiskDataset with TVTSet.""" """Test excpetions thrown when parsing TVTSet."""
with tempfile.TemporaryDirectory() as test_dir: with tempfile.TemporaryDirectory() as test_dir:
yaml_file = os.path.join(test_dir, "test.yaml")
# Case 1: ``format`` is invalid.
yaml_content = """ yaml_content = """
train_set: train_sets:
- - type_name: paper - - type_name: paper
format: torch format: torch_invalid
path: set/paper-train.pt path: set/paper-train.pt
- type_name: 'paper:cites:paper'
format: numpy
path: set/cites-train.pt
""" """
yaml_file = os.path.join(test_dir, "test.yaml") yaml_file = os.path.join(test_dir, "test.yaml")
with open(yaml_file, "w") as f: with open(yaml_file, "w") as f:
f.write(yaml_content) f.write(yaml_content)
_ = gb.OnDiskDataset(yaml_file) with pytest.raises(pydantic.ValidationError):
_ = gb.OnDiskDataset(yaml_file)
# Invalid format. # Case 2: ``type_name`` is not specified while multiple TVT sets are specified.
yaml_content = """ yaml_content = """
train_set: train_sets:
- - type_name: paper - - type_name: null
format: torch_invalid format: numpy
path: set/paper-train.pt path: set/train.npy
- type_name: 'paper:cites:paper' - type_name: null
format: numpy_invalid format: numpy
path: set/cites-train.pt path: set/train.npy
""" """
with open(yaml_file, "w") as f: with open(yaml_file, "w") as f:
f.write(yaml_content) f.write(yaml_content)
with pytest.raises(pydantic.ValidationError): with pytest.raises(
AssertionError,
match=r"Only one TVT set is allowed if type_name is not specified.",
):
_ = gb.OnDiskDataset(yaml_file) _ = gb.OnDiskDataset(yaml_file)
def test_OnDiskDataset_TVTSet_ItemSet_id_label():
"""Test TVTSet which returns ItemSet with IDs and labels."""
with tempfile.TemporaryDirectory() as test_dir:
train_ids = np.arange(1000)
train_labels = np.random.randint(0, 10, size=1000)
train_data = np.vstack([train_ids, train_labels]).T
train_path = os.path.join(test_dir, "train.npy")
np.save(train_path, train_data)
validation_ids = np.arange(1000, 2000)
validation_labels = np.random.randint(0, 10, size=1000)
validation_data = np.vstack([validation_ids, validation_labels]).T
validation_path = os.path.join(test_dir, "validation.npy")
np.save(validation_path, validation_data)
test_ids = np.arange(2000, 3000)
test_labels = np.random.randint(0, 10, size=1000)
test_data = np.vstack([test_ids, test_labels]).T
test_path = os.path.join(test_dir, "test.npy")
np.save(test_path, test_data)
# Case 1:
# all TVT sets are specified.
# ``type_name`` is not specified or specified as ``null``.
# ``in_memory`` could be ``true`` and ``false``.
yaml_content = f"""
train_sets:
- - type_name: null
format: numpy
in_memory: true
path: {train_path}
- - type_name: null
format: numpy
path: {train_path}
validation_sets:
- - format: numpy
path: {validation_path}
- - type_name: null
format: numpy
path: {validation_path}
test_sets:
- - type_name: null
format: numpy
in_memory: false
path: {test_path}
- - type_name: null
format: numpy
path: {test_path}
"""
yaml_file = os.path.join(test_dir, "test.yaml")
with open(yaml_file, "w") as f:
f.write(yaml_content)
dataset = gb.OnDiskDataset(yaml_file)
# Verify train set.
train_sets = dataset.train_sets()
assert len(train_sets) == 2
for train_set in train_sets:
assert len(train_set) == 1000
assert isinstance(train_set, gb.ItemSet)
for i, (id, label) in enumerate(train_set):
assert id == train_ids[i]
assert label == train_labels[i]
train_sets = None
# Verify validation set.
validation_sets = dataset.validation_sets()
assert len(validation_sets) == 2
for validation_set in validation_sets:
assert len(validation_set) == 1000
assert isinstance(validation_set, gb.ItemSet)
for i, (id, label) in enumerate(validation_set):
assert id == validation_ids[i]
assert label == validation_labels[i]
validation_sets = None
# Verify test set.
test_sets = dataset.test_sets()
assert len(test_sets) == 2
for test_set in test_sets:
assert len(test_set) == 1000
assert isinstance(test_set, gb.ItemSet)
for i, (id, label) in enumerate(test_set):
assert id == test_ids[i]
assert label == test_labels[i]
test_sets = None
dataset = None
# Case 2: Some TVT sets are None.
yaml_content = f"""
train_sets:
- - type_name: null
format: numpy
path: {train_path}
"""
yaml_file = os.path.join(test_dir, "test.yaml")
with open(yaml_file, "w") as f:
f.write(yaml_content)
dataset = gb.OnDiskDataset(yaml_file)
assert dataset.train_sets() is not None
assert dataset.validation_sets() is None
assert dataset.test_sets() is None
dataset = None
def test_OnDiskDataset_TVTSet_ItemSet_node_pair_label():
"""Test TVTSet which returns ItemSet with IDs and labels."""
with tempfile.TemporaryDirectory() as test_dir:
train_pairs = (np.arange(1000), np.arange(1000, 2000))
train_labels = np.random.randint(0, 10, size=1000)
train_data = np.vstack([train_pairs, train_labels]).T
train_path = os.path.join(test_dir, "train.npy")
np.save(train_path, train_data)
validation_pairs = (np.arange(1000, 2000), np.arange(2000, 3000))
validation_labels = np.random.randint(0, 10, size=1000)
validation_data = np.vstack([validation_pairs, validation_labels]).T
validation_path = os.path.join(test_dir, "validation.npy")
np.save(validation_path, validation_data)
test_pairs = (np.arange(2000, 3000), np.arange(3000, 4000))
test_labels = np.random.randint(0, 10, size=1000)
test_data = np.vstack([test_pairs, test_labels]).T
test_path = os.path.join(test_dir, "test.npy")
np.save(test_path, test_data)
yaml_content = f"""
train_sets:
- - type_name: null
format: numpy
in_memory: true
path: {train_path}
- - type_name: null
format: numpy
path: {train_path}
validation_sets:
- - format: numpy
path: {validation_path}
- - type_name: null
format: numpy
path: {validation_path}
test_sets:
- - type_name: null
format: numpy
in_memory: false
path: {test_path}
- - type_name: null
format: numpy
path: {test_path}
"""
yaml_file = os.path.join(test_dir, "test.yaml")
with open(yaml_file, "w") as f:
f.write(yaml_content)
dataset = gb.OnDiskDataset(yaml_file)
# Verify train set.
train_sets = dataset.train_sets()
assert len(train_sets) == 2
for train_set in train_sets:
assert len(train_set) == 1000
assert isinstance(train_set, gb.ItemSet)
for i, (src, dst, label) in enumerate(train_set):
assert src == train_pairs[0][i]
assert dst == train_pairs[1][i]
assert label == train_labels[i]
train_sets = None
# Verify validation set.
validation_sets = dataset.validation_sets()
assert len(validation_sets) == 2
for validation_set in validation_sets:
assert len(validation_set) == 1000
assert isinstance(validation_set, gb.ItemSet)
for i, (src, dst, label) in enumerate(validation_set):
assert src == validation_pairs[0][i]
assert dst == validation_pairs[1][i]
assert label == validation_labels[i]
validation_sets = None
# Verify test set.
test_sets = dataset.test_sets()
assert len(test_sets) == 2
for test_set in test_sets:
assert len(test_set) == 1000
assert isinstance(test_set, gb.ItemSet)
for i, (src, dst, label) in enumerate(test_set):
assert src == test_pairs[0][i]
assert dst == test_pairs[1][i]
assert label == test_labels[i]
test_sets = None
dataset = None
def test_OnDiskDataset_TVTSet_ItemSetDict_id_label():
"""Test TVTSet which returns ItemSetDict with IDs and labels."""
with tempfile.TemporaryDirectory() as test_dir:
train_ids = np.arange(1000)
train_labels = np.random.randint(0, 10, size=1000)
train_data = np.vstack([train_ids, train_labels]).T
train_path = os.path.join(test_dir, "train.npy")
np.save(train_path, train_data)
validation_ids = np.arange(1000, 2000)
validation_labels = np.random.randint(0, 10, size=1000)
validation_data = np.vstack([validation_ids, validation_labels]).T
validation_path = os.path.join(test_dir, "validation.npy")
np.save(validation_path, validation_data)
test_ids = np.arange(2000, 3000)
test_labels = np.random.randint(0, 10, size=1000)
test_data = np.vstack([test_ids, test_labels]).T
test_path = os.path.join(test_dir, "test.npy")
np.save(test_path, test_data)
yaml_content = f"""
train_sets:
- - type_name: paper
format: numpy
in_memory: true
path: {train_path}
- - type_name: author
format: numpy
path: {train_path}
validation_sets:
- - type_name: paper
format: numpy
path: {validation_path}
- - type_name: author
format: numpy
path: {validation_path}
test_sets:
- - type_name: paper
format: numpy
in_memory: false
path: {test_path}
- - type_name: author
format: numpy
path: {test_path}
"""
yaml_file = os.path.join(test_dir, "test.yaml")
with open(yaml_file, "w") as f:
f.write(yaml_content)
dataset = gb.OnDiskDataset(yaml_file)
# Verify train set.
train_sets = dataset.train_sets()
assert len(train_sets) == 2
for train_set in train_sets:
assert len(train_set) == 1000
assert isinstance(train_set, gb.ItemSetDict)
for i, item in enumerate(train_set):
assert isinstance(item, dict)
assert len(item) == 1
key = list(item.keys())[0]
assert key in ["paper", "author"]
id, label = item[key]
assert id == train_ids[i]
assert label == train_labels[i]
train_sets = None
# Verify validation set.
validation_sets = dataset.validation_sets()
assert len(validation_sets) == 2
for validation_set in validation_sets:
assert len(validation_set) == 1000
assert isinstance(train_set, gb.ItemSetDict)
for i, item in enumerate(validation_set):
assert isinstance(item, dict)
assert len(item) == 1
key = list(item.keys())[0]
assert key in ["paper", "author"]
id, label = item[key]
assert id == validation_ids[i]
assert label == validation_labels[i]
validation_sets = None
# Verify test set.
test_sets = dataset.test_sets()
assert len(test_sets) == 2
for test_set in test_sets:
assert len(test_set) == 1000
assert isinstance(train_set, gb.ItemSetDict)
for i, item in enumerate(test_set):
assert isinstance(item, dict)
assert len(item) == 1
key = list(item.keys())[0]
assert key in ["paper", "author"]
id, label = item[key]
assert id == test_ids[i]
assert label == test_labels[i]
test_sets = None
dataset = None
def test_OnDiskDataset_TVTSet_ItemSetDict_node_pair_label():
"""Test TVTSet which returns ItemSetDict with node pairs and labels."""
with tempfile.TemporaryDirectory() as test_dir:
train_pairs = (np.arange(1000), np.arange(1000, 2000))
train_labels = np.random.randint(0, 10, size=1000)
train_data = np.vstack([train_pairs, train_labels]).T
train_path = os.path.join(test_dir, "train.npy")
np.save(train_path, train_data)
validation_pairs = (np.arange(1000, 2000), np.arange(2000, 3000))
validation_labels = np.random.randint(0, 10, size=1000)
validation_data = np.vstack([validation_pairs, validation_labels]).T
validation_path = os.path.join(test_dir, "validation.npy")
np.save(validation_path, validation_data)
test_pairs = (np.arange(2000, 3000), np.arange(3000, 4000))
test_labels = np.random.randint(0, 10, size=1000)
test_data = np.vstack([test_pairs, test_labels]).T
test_path = os.path.join(test_dir, "test.npy")
np.save(test_path, test_data)
yaml_content = f"""
train_sets:
- - type_name: paper
format: numpy
in_memory: true
path: {train_path}
- - type_name: author
format: numpy
path: {train_path}
validation_sets:
- - type_name: paper
format: numpy
path: {validation_path}
- - type_name: author
format: numpy
path: {validation_path}
test_sets:
- - type_name: paper
format: numpy
in_memory: false
path: {test_path}
- - type_name: author
format: numpy
path: {test_path}
"""
yaml_file = os.path.join(test_dir, "test.yaml")
with open(yaml_file, "w") as f:
f.write(yaml_content)
dataset = gb.OnDiskDataset(yaml_file)
# Verify train set.
train_sets = dataset.train_sets()
assert len(train_sets) == 2
for train_set in train_sets:
assert len(train_set) == 1000
assert isinstance(train_set, gb.ItemSetDict)
for i, item in enumerate(train_set):
assert isinstance(item, dict)
assert len(item) == 1
key = list(item.keys())[0]
assert key in ["paper", "author"]
src, dst, label = item[key]
assert src == train_pairs[0][i]
assert dst == train_pairs[1][i]
assert label == train_labels[i]
train_sets = None
# Verify validation set.
validation_sets = dataset.validation_sets()
assert len(validation_sets) == 2
for validation_set in validation_sets:
assert len(validation_set) == 1000
assert isinstance(train_set, gb.ItemSetDict)
for i, item in enumerate(validation_set):
assert isinstance(item, dict)
assert len(item) == 1
key = list(item.keys())[0]
assert key in ["paper", "author"]
src, dst, label = item[key]
assert src == validation_pairs[0][i]
assert dst == validation_pairs[1][i]
assert label == validation_labels[i]
validation_sets = None
# Verify test set.
test_sets = dataset.test_sets()
assert len(test_sets) == 2
for test_set in test_sets:
assert len(test_set) == 1000
assert isinstance(train_set, gb.ItemSetDict)
for i, item in enumerate(test_set):
assert isinstance(item, dict)
assert len(item) == 1
key = list(item.keys())[0]
assert key in ["paper", "author"]
src, dst, label = item[key]
assert src == test_pairs[0][i]
assert dst == test_pairs[1][i]
assert label == test_labels[i]
test_sets = None
dataset = None
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment