Unverified Commit 5fbb33e7 authored by Rhett Ying's avatar Rhett Ying Committed by GitHub
Browse files

[GraphBolt] init feature data for Dataset (#5971)

parent 70ad5083
"""GraphBolt Dataset."""
from typing import List
from typing import Dict, List
from .feature_store import FeatureStore
from .itemset import ItemSet, ItemSetDict
......@@ -47,6 +47,6 @@ class Dataset:
"""Return the graph."""
raise NotImplementedError
def feature(self) -> FeatureStore:
def feature(self) -> Dict[object, FeatureStore]:
"""Return the feature."""
raise NotImplementedError
"""GraphBolt OnDiskDataset."""
from typing import List
from typing import Dict, List, Tuple
from ..dataset import Dataset
from ..feature_store import FeatureStore
from ..itemset import ItemSet, ItemSetDict
from ..utils import read_data, tensor_to_tuple
from .ondisk_metadata import OnDiskMetaData, OnDiskTVTSet
from .torch_based_feature_store import (
load_feature_stores,
TorchBasedFeatureStore,
)
__all__ = ["OnDiskDataset"]
......@@ -24,6 +27,19 @@ class OnDiskDataset(Dataset):
.. code-block:: yaml
feature_data:
- domain: node
type: paper
name: feat
format: numpy
in_memory: false
path: node_data/paper-feat.npy
- domain: edge
type: "author:writes:paper"
name: feat
format: numpy
in_memory: false
path: edge_data/author-writes-paper-feat.npy
train_sets:
- - type_name: paper # could be null for homogeneous graph.
format: numpy
......@@ -49,6 +65,7 @@ class OnDiskDataset(Dataset):
def __init__(self, path: str) -> None:
with open(path, "r") as f:
self._meta = OnDiskMetaData.parse_raw(f.read(), proto="yaml")
self._feature = load_feature_stores(self._meta.feature_data)
self._train_sets = self._init_tvt_sets(self._meta.train_sets)
self._validation_sets = self._init_tvt_sets(self._meta.validation_sets)
self._test_sets = self._init_tvt_sets(self._meta.test_sets)
......@@ -69,9 +86,9 @@ class OnDiskDataset(Dataset):
"""Return the graph."""
raise NotImplementedError
def feature(self) -> FeatureStore:
def feature(self) -> Dict[Tuple, TorchBasedFeatureStore]:
"""Return the feature."""
raise NotImplementedError
return self._feature
def _init_tvt_sets(
self, tvt_sets: List[List[OnDiskTVTSet]]
......
......@@ -56,6 +56,7 @@ class OnDiskMetaData(pydantic_yaml.YamlModel):
is a list of list of ``OnDiskTVTSet``.
"""
train_sets: Optional[List[List[OnDiskTVTSet]]]
validation_sets: Optional[List[List[OnDiskTVTSet]]]
test_sets: Optional[List[List[OnDiskTVTSet]]]
feature_data: Optional[List[OnDiskFeatureData]] = []
train_sets: Optional[List[List[OnDiskTVTSet]]] = []
validation_sets: Optional[List[List[OnDiskTVTSet]]] = []
test_sets: Optional[List[List[OnDiskTVTSet]]] = []
......@@ -5,6 +5,7 @@ import numpy as np
import pydantic
import pytest
import torch
from dgl import graphbolt as gb
......@@ -446,3 +447,172 @@ def test_OnDiskDataset_TVTSet_ItemSetDict_node_pair_label():
assert label == test_labels[i]
test_sets = None
dataset = None
def test_OnDiskDataset_Feature_heterograph():
"""Test Feature storage."""
with tempfile.TemporaryDirectory() as test_dir:
# Generate node data.
node_data_paper = np.random.rand(1000, 10)
node_data_paper_path = os.path.join(test_dir, "node_data_paper.npy")
np.save(node_data_paper_path, node_data_paper)
node_data_label = np.random.randint(0, 10, size=1000)
node_data_label_path = os.path.join(test_dir, "node_data_label.npy")
np.save(node_data_label_path, node_data_label)
# Generate edge data.
edge_data_writes = np.random.rand(1000, 10)
edge_data_writes_path = os.path.join(test_dir, "edge_writes_paper.npy")
np.save(edge_data_writes_path, edge_data_writes)
edge_data_label = np.random.randint(0, 10, size=1000)
edge_data_label_path = os.path.join(test_dir, "edge_data_label.npy")
np.save(edge_data_label_path, edge_data_label)
# Generate YAML.
yaml_content = f"""
feature_data:
- domain: node
type: paper
name: feat
format: numpy
in_memory: false
path: {node_data_paper_path}
- domain: node
type: paper
name: label
format: numpy
in_memory: true
path: {node_data_label_path}
- domain: edge
type: "author:writes:paper"
name: feat
format: numpy
in_memory: false
path: {edge_data_writes_path}
- domain: edge
type: "author:writes:paper"
name: label
format: numpy
in_memory: true
path: {edge_data_label_path}
"""
yaml_file = os.path.join(test_dir, "test.yaml")
with open(yaml_file, "w") as f:
f.write(yaml_content)
dataset = gb.OnDiskDataset(yaml_file)
# Verify feature data storage.
feature_data = dataset.feature()
assert len(feature_data) == 4
# Verify node feature data.
node_paper_feat = feature_data[("node", "paper", "feat")]
assert isinstance(node_paper_feat, gb.TorchBasedFeatureStore)
assert torch.equal(
node_paper_feat.read(), torch.tensor(node_data_paper)
)
node_paper_label = feature_data[("node", "paper", "label")]
assert isinstance(node_paper_label, gb.TorchBasedFeatureStore)
assert torch.equal(
node_paper_label.read(), torch.tensor(node_data_label)
)
# Verify edge feature data.
edge_writes_feat = feature_data[("edge", "author:writes:paper", "feat")]
assert isinstance(edge_writes_feat, gb.TorchBasedFeatureStore)
assert torch.equal(
edge_writes_feat.read(), torch.tensor(edge_data_writes)
)
edge_writes_label = feature_data[
("edge", "author:writes:paper", "label")
]
assert isinstance(edge_writes_label, gb.TorchBasedFeatureStore)
assert torch.equal(
edge_writes_label.read(), torch.tensor(edge_data_label)
)
node_paper_feat = None
node_paper_label = None
edge_writes_feat = None
edge_writes_label = None
feature_data = None
dataset = None
def test_OnDiskDataset_Feature_homograph():
"""Test Feature storage."""
with tempfile.TemporaryDirectory() as test_dir:
# Generate node data.
node_data_feat = np.random.rand(1000, 10)
node_data_feat_path = os.path.join(test_dir, "node_data_feat.npy")
np.save(node_data_feat_path, node_data_feat)
node_data_label = np.random.randint(0, 10, size=1000)
node_data_label_path = os.path.join(test_dir, "node_data_label.npy")
np.save(node_data_label_path, node_data_label)
# Generate edge data.
edge_data_feat = np.random.rand(1000, 10)
edge_data_feat_path = os.path.join(test_dir, "edge_data_feat.npy")
np.save(edge_data_feat_path, edge_data_feat)
edge_data_label = np.random.randint(0, 10, size=1000)
edge_data_label_path = os.path.join(test_dir, "edge_data_label.npy")
np.save(edge_data_label_path, edge_data_label)
# Generate YAML.
# ``type`` is not specified in the YAML.
yaml_content = f"""
feature_data:
- domain: node
name: feat
format: numpy
in_memory: false
path: {node_data_feat_path}
- domain: node
name: label
format: numpy
in_memory: true
path: {node_data_label_path}
- domain: edge
name: feat
format: numpy
in_memory: false
path: {edge_data_feat_path}
- domain: edge
name: label
format: numpy
in_memory: true
path: {edge_data_label_path}
"""
yaml_file = os.path.join(test_dir, "test.yaml")
with open(yaml_file, "w") as f:
f.write(yaml_content)
dataset = gb.OnDiskDataset(yaml_file)
# Verify feature data storage.
feature_data = dataset.feature()
assert len(feature_data) == 4
# Verify node feature data.
node_feat = feature_data[("node", None, "feat")]
assert isinstance(node_feat, gb.TorchBasedFeatureStore)
assert torch.equal(node_feat.read(), torch.tensor(node_data_feat))
node_label = feature_data[("node", None, "label")]
assert isinstance(node_label, gb.TorchBasedFeatureStore)
assert torch.equal(node_label.read(), torch.tensor(node_data_label))
# Verify edge feature data.
edge_feat = feature_data[("edge", None, "feat")]
assert isinstance(edge_feat, gb.TorchBasedFeatureStore)
assert torch.equal(edge_feat.read(), torch.tensor(edge_data_feat))
edge_label = feature_data[("edge", None, "label")]
assert isinstance(edge_label, gb.TorchBasedFeatureStore)
assert torch.equal(edge_label.read(), torch.tensor(edge_data_label))
node_feat = None
node_label = None
edge_feat = None
edge_label = None
feature_data = None
dataset = None
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment