Unverified Commit 40dcc715 authored by czkkkkkk's avatar czkkkkkk Committed by GitHub
Browse files

[Graphbolt] Support on-disk feature store. (#5914)

parent 69d9b726
......@@ -49,18 +49,20 @@ class FeatureStore:
raise NotImplementedError
class InMemoryFeatureStore(FeatureStore):
r"""In-memory key-value feature store, where the key is a string and value
is Pytorch tensor."""
class TorchBasedFeatureStore(FeatureStore):
r"""Torch based key-value feature store, where the key are strings and
values are Pytorch tensors."""
def __init__(self, feature_dict: dict):
"""Initialize an in-memory feature store.
"""Initialize a torch based feature store.
The feature store is initialized with a dictionary of tensors, where the
key is the name of a feature and the value is the tensor. The value can
be multi-dimensional, where the first dimension is the index of the
feature.
Note that the values can be in memory or on disk.
Parameters
----------
feature_dict : dict, optional
......@@ -74,7 +76,7 @@ class InMemoryFeatureStore(FeatureStore):
... "item": torch.arange(0, 6),
... "rel": torch.arange(0, 6).view(2, 3),
... }
>>> feature_store = InMemoryFeatureStore(feature_dict)
>>> feature_store = TorchBasedFeatureStore(feature_dict)
>>> feature_store.read("user", torch.tensor([0, 1, 2]))
tensor([0, 1, 2])
>>> feature_store.read("item", torch.tensor([0, 1, 2]))
......@@ -85,18 +87,35 @@ class InMemoryFeatureStore(FeatureStore):
... torch.ones(3, dtype=torch.long), torch.tensor([0, 1, 2]))
>>> feature_store.read("user", torch.tensor([0, 1, 2]))
tensor([1, 1, 1])
>>> import numpy as np
>>> user = np.arange(0, 5)
>>> item = np.arange(0, 6)
>>> np.save("/tmp/user.npy", user)
>>. np.save("/tmp/item.npy", item)
>>> feature_dict = {
... "user": torch.as_tensor(np.load("/tmp/user.npy",
... mmap_mode="r+")),
... "item": torch.as_tensor(np.load("/tmp/item.npy",
... mmap_mode="r+")),
... }
>>> feature_store = TorchBasedFeatureStore(feature_dict)
>>> feature_store.read("user", torch.tensor([0, 1, 2]))
tensor([0, 1, 2])
>>> feature_store.read("item", torch.tensor([3, 4, 2]))
tensor([3, 4, 2])
"""
super(InMemoryFeatureStore, self).__init__()
super(TorchBasedFeatureStore, self).__init__()
assert isinstance(feature_dict, dict), (
f"feature_dict in InMemoryFeatureStore must be dict, "
f"feature_dict in TorchBasedFeatureStore must be dict, "
f"but got {type(feature_dict)}."
)
for k, v in feature_dict.items():
assert isinstance(
k, str
), f"Key in InMemoryFeatureStore must be str, but got {k}."
), f"Key in TorchBasedFeatureStore must be str, but got {k}."
assert isinstance(v, torch.Tensor), (
f"Value in InMemoryFeatureStore must be torch.Tensor,"
f"Value in TorchBasedFeatureStore must be torch.Tensor,"
f"but got {v}."
)
......@@ -105,6 +124,9 @@ class InMemoryFeatureStore(FeatureStore):
def read(self, key: str, ids: torch.Tensor = None):
"""Read a feature from the feature store by index.
The returned feature is always in memory, no matter whether the feature
to read is in memory or on disk.
Parameters
----------
key : str
......
import os
import tempfile
import numpy as np
import pytest
import torch
from dgl import graphbolt as gb
def test_in_memory_feature_store():
a = torch.tensor([1, 2, 3])
b = torch.tensor([3, 4, 5])
c = torch.tensor([[1, 2, 3], [4, 5, 6]])
feature_store = gb.InMemoryFeatureStore({"a": a, "b": b, "c": c})
assert torch.equal(feature_store.read("a"), torch.tensor([1, 2, 3]))
assert torch.equal(feature_store.read("b"), torch.tensor([3, 4, 5]))
assert torch.equal(
feature_store.read("a", torch.tensor([0, 2])),
torch.tensor([1, 3]),
)
assert torch.equal(
feature_store.read("a", torch.tensor([1, 1])),
torch.tensor([2, 2]),
)
assert torch.equal(
feature_store.read("c", torch.tensor([1])),
torch.tensor([[4, 5, 6]]),
)
feature_store.update("a", torch.tensor([0, 1, 2]))
assert torch.equal(feature_store.read("a"), torch.tensor([0, 1, 2]))
assert torch.equal(
feature_store.read("a", torch.tensor([0, 2])),
torch.tensor([0, 2]),
)
with pytest.raises(AssertionError):
feature_store.read("d")
with pytest.raises(IndexError):
feature_store.read("a", torch.tensor([0, 1, 2, 3]))
def to_on_disk_tensor(test_dir, name, t):
path = os.path.join(test_dir, name + ".npy")
t = t.numpy()
np.save(path, t)
# The Pytorch tensor is a view of the numpy array on disk, which does not
# consume memory.
t = torch.as_tensor(np.load(path, mmap_mode="r+"))
return t
@pytest.mark.parametrize("in_memory", [True, False])
def test_torch_based_feature_store(in_memory):
with tempfile.TemporaryDirectory() as test_dir:
a = torch.tensor([1, 2, 3])
b = torch.tensor([3, 4, 5])
c = torch.tensor([[1, 2, 3], [4, 5, 6]])
if not in_memory:
a = to_on_disk_tensor(test_dir, "a", a)
b = to_on_disk_tensor(test_dir, "b", b)
c = to_on_disk_tensor(test_dir, "c", c)
feature_store = gb.TorchBasedFeatureStore({"a": a, "b": b, "c": c})
assert torch.equal(feature_store.read("a"), torch.tensor([1, 2, 3]))
assert torch.equal(feature_store.read("b"), torch.tensor([3, 4, 5]))
assert torch.equal(
feature_store.read("a", torch.tensor([0, 2])),
torch.tensor([1, 3]),
)
assert torch.equal(
feature_store.read("a", torch.tensor([1, 1])),
torch.tensor([2, 2]),
)
assert torch.equal(
feature_store.read("c", torch.tensor([1])),
torch.tensor([[4, 5, 6]]),
)
feature_store.update("a", torch.tensor([0, 1, 2]))
assert torch.equal(feature_store.read("a"), torch.tensor([0, 1, 2]))
assert torch.equal(
feature_store.read("a", torch.tensor([0, 2])),
torch.tensor([0, 2]),
)
with pytest.raises(AssertionError):
feature_store.read("d")
with pytest.raises(IndexError):
feature_store.read("a", torch.tensor([0, 1, 2, 3]))
# For windows, the file is locked by the numpy.load. We need to delete
# it before closing the temporary directory.
a = b = c = feature_store = None
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment