Unverified Commit f50cc3ca authored by Rhett Ying's avatar Rhett Ying Committed by GitHub
Browse files

[GraphBolt] update FeatureStore to use tuple as key (#5965)

parent 90a308f3
"""Feature store for GraphBolt."""
from typing import List
from typing import List, Optional
import numpy as np
import pydantic
......@@ -142,21 +142,30 @@ class TorchBasedFeatureStore(FeatureStore):
self._tensor[ids] = value
# FIXME(Rui): To avoid circular import, we make a copy of `OnDiskDataFormatEnum`
# from dataset.py. We need to merge the two definitions later.
class OnDiskDataFormatEnum(pydantic_yaml.YamlStrEnum):
"""Enum of data format."""
# [TODO] Move code to 'impl/' and separate OnDisk-related code to another file.
class FeatureDataFormatEnum(pydantic_yaml.YamlStrEnum):
"""Enum of feature data format."""
TORCH = "torch"
NUMPY = "numpy"
class FeatureDataDomainEnum(pydantic_yaml.YamlStrEnum):
"""Enum of feature data domain."""
NODE = "node"
EDGE = "edge"
GRAPH = "graph"
class OnDiskFeatureData(pydantic.BaseModel):
r"""The description of an on-disk feature."""
domain: FeatureDataDomainEnum
type: Optional[str]
name: str
format: OnDiskDataFormatEnum
format: FeatureDataFormatEnum
path: str
in_memory: bool = True
in_memory: Optional[bool] = True
def load_feature_stores(feat_data: List[OnDiskFeatureData]):
......@@ -186,25 +195,27 @@ def load_feature_stores(feat_data: List[OnDiskFeatureData]):
>>> import torch
>>> import numpy as np
>>> from dgl import graphbolt as gb
>>> a = torch.tensor([1, 2, 3])
>>> b = torch.tensor([[1, 2, 3], [4, 5, 6]])
>>> torch.save(a, "/tmp/a.pt")
>>> np.save("/tmp/b.npy", b.numpy())
>>> edge_label = torch.tensor([1, 2, 3])
>>> node_feat = torch.tensor([[1, 2, 3], [4, 5, 6]])
>>> torch.save(edge_label, "/tmp/edge_label.pt")
>>> np.save("/tmp/node_feat.npy", node_feat.numpy())
>>> feat_data = [
... gb.OnDiskFeatureData(name="a", format="torch", path="/tmp/a.pt",
... gb.OnDiskFeatureData(domain="edge", type="author:writes:paper",
... name="label", format="torch", path="/tmp/edge_label.pt",
... in_memory=True),
... gb.OnDiskFeatureData(name="b", format="numpy", path="/tmp/b.npy",
... in_memory=False),
... gb.OnDiskFeatureData(domain="node", type="paper", name="feat",
... format="numpy", path="/tmp/node_feat.npy", in_memory=False),
... ]
>>> gb.load_feature_stores(feat_data)
... {'a': <dgl.graphbolt.feature_store.TorchBasedFeatureStore object at
... 0x7ff093cb4df0>, 'b':
... {("edge", "author:writes:paper", "label"):
... <dgl.graphbolt.feature_store.TorchBasedFeatureStore object at
... 0x7ff093cb4df0>, ("node", "paper", "feat"):
... <dgl.graphbolt.feature_store.TorchBasedFeatureStore object at
... 0x7ff093cb4dc0>}
"""
feat_stores = {}
for spec in feat_data:
key = spec.name
key = (spec.domain, spec.type, spec.name)
if spec.format == "torch":
assert spec.in_memory, (
f"Pytorch tensor can only be loaded in memory, "
......
......@@ -2,6 +2,7 @@ import os
import tempfile
import numpy as np
import pydantic
import pytest
import torch
from dgl import graphbolt as gb
......@@ -59,12 +60,14 @@ def test_torch_based_feature_store(in_memory):
feat_store_a = feat_store_b = None
def write_tensor_to_disk(dir, name, t, fmt="pt"):
if fmt == "pt":
def write_tensor_to_disk(dir, name, t, fmt="torch"):
if fmt == "torch":
torch.save(t, os.path.join(dir, name + ".pt"))
else:
elif fmt == "numpy":
t = t.numpy()
np.save(os.path.join(dir, name + ".npy"), t)
else:
raise ValueError(f"Unsupported format: {fmt}")
@pytest.mark.parametrize("in_memory", [True, False])
......@@ -72,16 +75,20 @@ def test_load_feature_stores(in_memory):
with tempfile.TemporaryDirectory() as test_dir:
a = torch.tensor([1, 2, 3])
b = torch.tensor([2, 5, 3])
write_tensor_to_disk(test_dir, "a", a, fmt="pt")
write_tensor_to_disk(test_dir, "b", b, fmt="npy")
write_tensor_to_disk(test_dir, "a", a, fmt="torch")
write_tensor_to_disk(test_dir, "b", b, fmt="numpy")
feat_data = [
gb.OnDiskFeatureData(
domain="node",
type="paper",
name="a",
format="torch",
path=os.path.join(test_dir, "a.pt"),
in_memory=True,
),
gb.OnDiskFeatureData(
domain="edge",
type="paper-cites-paper",
name="b",
format="numpy",
path=os.path.join(test_dir, "b.npy"),
......@@ -89,10 +96,40 @@ def test_load_feature_stores(in_memory):
),
]
feat_stores = gb.load_feature_stores(feat_data)
assert torch.equal(feat_stores["a"].read(), torch.tensor([1, 2, 3]))
assert torch.equal(feat_stores["b"].read(), torch.tensor([2, 5, 3]))
assert torch.equal(
feat_stores[("node", "paper", "a")].read(), torch.tensor([1, 2, 3])
)
assert torch.equal(
feat_stores[("edge", "paper-cites-paper", "b")].read(),
torch.tensor([2, 5, 3]),
)
# For windows, the file is locked by the numpy.load. We need to delete
# it before closing the temporary directory.
a = b = None
feat_stores = None
# ``domain`` should be enum.
with pytest.raises(pydantic.ValidationError):
_ = gb.OnDiskFeatureData(
domain="invalid",
type="paper",
name="a",
format="torch",
path=os.path.join(test_dir, "a.pt"),
in_memory=True,
)
# ``type`` could be null.
feat_data = [
gb.OnDiskFeatureData(
domain="node",
name="a",
format="torch",
path=os.path.join(test_dir, "a.pt"),
in_memory=True,
),
]
feat_stores = gb.load_feature_stores(feat_data)
assert ("node", None, "a") in feat_stores
feat_stores = None
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment