Unverified Commit f971e25a authored by yxy235's avatar yxy235 Committed by GitHub
Browse files

[GraphBolt] Modify Feature class. (#6387)


Co-authored-by: default avatarUbuntu <ubuntu@ip-172-31-0-133.us-west-2.compute.internal>
parent d3dd8e37
......@@ -25,18 +25,21 @@ class GPUCachedFeature(Feature):
Examples
--------
>>> import torch
>>> torch_feat = torch.arange(0, 8)
>>> from dgl import graphbolt as gb
>>> torch_feat = torch.arange(10).reshape(2, -1).to("cuda")
>>> cache_size = 5
>>> fallback_feature = TorchBasedFeature(torch_feat)
>>> feature = GPUCachedFeature(fallback_feature, cache_size)
>>> fallback_feature = gb.TorchBasedFeature(torch_feat)
>>> feature = gb.GPUCachedFeature(fallback_feature, cache_size)
>>> feature.read()
tensor([0, 1, 2, 3, 4, 5, 6, 7])
>>> feature.read(torch.tensor([0, 1, 2]))
tensor([0, 1, 2])
>>> feature.update(torch.ones(3, dtype=torch.long),
... torch.tensor([0, 1, 2]))
>>> feature.read(torch.tensor([0, 1, 2, 3]))
tensor([1, 1, 1, 3])
tensor([[0, 1, 2, 3, 4],
[5, 6, 7, 8, 9]], device='cuda:0')
>>> feature.read(torch.tensor([0]).to("cuda"))
tensor([[0, 1, 2, 3, 4]], device='cuda:0')
>>> feature.update(torch.tensor([[1 for _ in range(5)]]).to("cuda"),
... torch.tensor([1]).to("cuda"))
>>> feature.read(torch.tensor([0, 1]).to("cuda"))
tensor([[0, 1, 2, 3, 4],
[1, 1, 1, 1, 1]], device='cuda:0')
"""
super(GPUCachedFeature, self).__init__()
assert isinstance(fallback_feature, Feature), (
......@@ -102,10 +105,6 @@ class GPUCachedFeature(Feature):
value[:size].to("cuda").reshape(self.flat_shape),
)
else:
assert ids.shape[0] == value.shape[0], (
f"ids and value must have the same length, "
f"but got {ids.shape[0]} and {value.shape[0]}."
)
self._fallback_feature.update(value, ids)
self._feature.replace(
ids.to("cuda"), value.to("cuda").reshape(self.flat_shape)
......
......@@ -23,37 +23,45 @@ class TorchBasedFeature(Feature):
----------
torch_feature : torch.Tensor
The torch feature.
Note that the dimension of the tensor should be greater than 1.
Examples
--------
>>> import torch
>>> torch_feat = torch.arange(0, 5)
>>> feature_store = TorchBasedFeature(torch_feat)
>>> feature_store.read()
tensor([0, 1, 2, 3, 4])
>>> feature_store.read(torch.tensor([0, 1, 2]))
tensor([0, 1, 2])
>>> feature_store.update(torch.ones(3, dtype=torch.long),
... torch.tensor([0, 1, 2]))
>>> feature_store.read(torch.tensor([0, 1, 2, 3]))
tensor([1, 1, 1, 3])
>>> from dgl import graphbolt as gb
>>> torch_feat = torch.arange(10).reshape(2, -1)
>>> feature = gb.TorchBasedFeature(torch_feat)
>>> feature.read()
tensor([[0, 1, 2, 3, 4],
[5, 6, 7, 8, 9]])
>>> feature.read(torch.tensor([0]))
tensor([[0, 1, 2, 3, 4]])
>>> feature.update(torch.tensor([[1 for _ in range(5)]]),
... torch.tensor([1]))
>>> feature.read(torch.tensor([0, 1]))
tensor([[0, 1, 2, 3, 4],
[1, 1, 1, 1, 1]])
>>> import numpy as np
>>> arr = np.arange(0, 5)
>>> arr = np.array([[1, 2], [3, 4]])
>>> np.save("/tmp/arr.npy", arr)
>>> torch_feat = torch.as_tensor(np.load("/tmp/arr.npy",
... mmap_mode="r+"))
>>> feature_store = TorchBasedFeature(torch_feat)
>>> feature_store.read()
tensor([0, 1, 2, 3, 4])
>>> feature_store.read(torch.tensor([0, 1, 2]))
tensor([0, 1, 2])
>>> torch_feat = torch.from_numpy(np.load("/tmp/arr.npy", mmap_mode="r+"))
>>> feature = gb.TorchBasedFeature(torch_feat)
>>> feature.read()
tensor([[1, 2],
[3, 4]])
>>> feature.read(torch.tensor([0]))
tensor([[1, 2]])
"""
super().__init__()
assert isinstance(torch_feature, torch.Tensor), (
f"torch_feature in TorchBasedFeature must be torch.Tensor, "
f"but got {type(torch_feature)}."
)
assert torch_feature.dim() > 1, (
f"dimension of torch_feature in TorchBasedFeature must be greater "
f"than 1, but got {torch_feature.dim()} dimension."
)
self._tensor = torch_feature
def read(self, ids: torch.Tensor = None):
......@@ -103,6 +111,7 @@ class TorchBasedFeature(Feature):
f"ids and value must have the same length, "
f"but got {ids.shape[0]} and {value.shape[0]}."
)
# [Todo] Check the value feature size matches tesnsor's one.
self._tensor[ids] = value
......@@ -136,7 +145,7 @@ class TorchBasedFeatureStore(BasicFeatureStore):
>>> import torch
>>> import numpy as np
>>> from dgl import graphbolt as gb
>>> edge_label = torch.tensor([1, 2, 3])
>>> edge_label = torch.tensor([[1], [2], [3]])
>>> node_feat = torch.tensor([[1, 2, 3], [4, 5, 6]])
>>> torch.save(edge_label, "/tmp/edge_label.pt")
>>> np.save("/tmp/node_feat.npy", node_feat.numpy())
......
......@@ -5,8 +5,8 @@ from dgl import graphbolt as gb
def test_basic_feature_store_homo():
a = torch.tensor([3, 2, 1])
b = torch.tensor([2, 5, 3])
a = torch.tensor([[1, 2, 4], [2, 5, 3]])
b = torch.tensor([[[1, 2], [3, 4]], [[2, 5], [4, 3]]])
features = {}
features[("node", None, "a")] = gb.TorchBasedFeature(a)
......@@ -16,55 +16,63 @@ def test_basic_feature_store_homo():
# Test read the entire feature.
assert torch.equal(
feature_store.read("node", None, "a"), torch.tensor([3, 2, 1])
feature_store.read("node", None, "a"),
torch.tensor([[1, 2, 4], [2, 5, 3]]),
)
assert torch.equal(
feature_store.read("node", None, "b"), torch.tensor([2, 5, 3])
feature_store.read("node", None, "b"),
torch.tensor([[[1, 2], [3, 4]], [[2, 5], [4, 3]]]),
)
# Test read with ids.
assert torch.equal(
feature_store.read("node", None, "a", torch.tensor([0, 1])),
torch.tensor([3, 2]),
feature_store.read("node", None, "a", torch.tensor([0])),
torch.tensor([[1, 2, 4]]),
)
assert torch.equal(
feature_store.read("node", None, "b", torch.tensor([0])),
torch.tensor([[[1, 2], [3, 4]]]),
)
def test_basic_feature_store_hetero():
a = torch.tensor([3, 2, 1])
b = torch.tensor([2, 5, 3])
c = torch.tensor([6, 8, 9])
a = torch.tensor([[1, 2, 4], [2, 5, 3]])
b = torch.tensor([[[6], [8]], [[8], [9]]])
features = {}
features[("node", "paper", "a")] = gb.TorchBasedFeature(a)
features[("node", "author", "b")] = gb.TorchBasedFeature(b)
features[("edge", "paper:cites:paper", "c")] = gb.TorchBasedFeature(c)
features[("node", "author", "a")] = gb.TorchBasedFeature(a)
features[("edge", "paper:cites:paper", "b")] = gb.TorchBasedFeature(b)
feature_store = gb.BasicFeatureStore(features)
# Test read the entire feature.
assert torch.equal(
feature_store.read("node", "paper", "a"), torch.tensor([3, 2, 1])
)
assert torch.equal(
feature_store.read("node", "author", "b"), torch.tensor([2, 5, 3])
feature_store.read("node", "author", "a"),
torch.tensor([[1, 2, 4], [2, 5, 3]]),
)
assert torch.equal(
feature_store.read("edge", "paper:cites:paper", "c"),
torch.tensor([6, 8, 9]),
feature_store.read("edge", "paper:cites:paper", "b"),
torch.tensor([[[6], [8]], [[8], [9]]]),
)
# Test read with ids.
assert torch.equal(
feature_store.read("node", "paper", "a", torch.tensor([0, 1])),
torch.tensor([3, 2]),
feature_store.read("node", "author", "a", torch.tensor([0])),
torch.tensor([[1, 2, 4]]),
)
def test_basic_feature_store_errors():
a = torch.tensor([3, 2, 1])
b = torch.tensor([2, 5, 3])
b = torch.tensor([[1, 2, 4], [2, 5, 3]])
features = {}
# Test error when dimension of the value is illegal.
with pytest.raises(
AssertionError,
match=rf"dimension of torch_feature in TorchBasedFeature must be "
rf"greater than 1, but got {a.dim()} dimension.",
):
features[("node", "paper", "a")] = gb.TorchBasedFeature(a)
features[("node", "author", "b")] = gb.TorchBasedFeature(b)
......@@ -76,4 +84,4 @@ def test_basic_feature_store_errors():
# Test error when at least one id is out of bound.
with pytest.raises(IndexError):
feature_store.read("node", "paper", "a", torch.tensor([0, 3]))
feature_store.read("node", "author", "b", torch.tensor([0, 3]))
......@@ -12,8 +12,8 @@ from dgl import graphbolt as gb
reason="GPUCachedFeature requires a GPU.",
)
def test_gpu_cached_feature():
a = torch.tensor([1, 2, 3]).to("cuda").float()
b = torch.tensor([[1, 2, 3], [4, 5, 6]]).to("cuda").float()
a = torch.tensor([[1, 2, 3], [4, 5, 6]]).to("cuda").float()
b = torch.tensor([[[1, 2], [3, 4]], [[4, 5], [6, 7]]]).to("cuda").float()
feat_store_a = gb.GPUCachedFeature(gb.TorchBasedFeature(a), 2)
feat_store_b = gb.GPUCachedFeature(gb.TorchBasedFeature(b), 1)
......@@ -24,28 +24,30 @@ def test_gpu_cached_feature():
# Test read with ids.
assert torch.equal(
feat_store_a.read(torch.tensor([0, 2]).to("cuda")),
torch.tensor([1.0, 3.0]).to("cuda"),
feat_store_a.read(torch.tensor([0]).to("cuda")),
torch.tensor([[1.0, 2.0, 3.0]]).to("cuda"),
)
assert torch.equal(
feat_store_a.read(torch.tensor([1, 1]).to("cuda")),
torch.tensor([2.0, 2.0]).to("cuda"),
)
assert torch.equal(
feat_store_b.read(torch.tensor([1]).to("cuda")),
torch.tensor([[4.0, 5.0, 6.0]]).to("cuda"),
feat_store_b.read(torch.tensor([1, 1]).to("cuda")),
torch.tensor([[[4.0, 5.0], [6.0, 7.0]], [[4.0, 5.0], [6.0, 7.0]]]).to(
"cuda"
),
)
# Test update the entire feature.
feat_store_a.update(torch.tensor([0.0, 1.0, 2.0]).to("cuda"))
feat_store_a.update(
torch.tensor([[0.0, 1.0, 2.0], [3.0, 5.0, 2.0]]).to("cuda")
)
assert torch.equal(
feat_store_a.read(), torch.tensor([0.0, 1.0, 2.0]).to("cuda")
feat_store_a.read(),
torch.tensor([[0.0, 1.0, 2.0], [3.0, 5.0, 2.0]]).to("cuda"),
)
# Test update with ids.
feat_store_a.update(
torch.tensor([2.0, 0.0]).to("cuda"), torch.tensor([0, 2]).to("cuda")
torch.tensor([[2.0, 0.0, 1.0]]).to("cuda"), torch.tensor([0]).to("cuda")
)
assert torch.equal(
feat_store_a.read(), torch.tensor([2.0, 1.0, 0.0]).to("cuda")
feat_store_a.read(),
torch.tensor([[2.0, 0.0, 1.0], [3.0, 5.0, 2.0]]).to("cuda"),
)
import os
import pickle
import random
import re
import tempfile
import unittest
......@@ -764,7 +765,9 @@ def test_OnDiskDataset_Feature_heterograph():
node_data_paper = np.random.rand(1000, 10)
node_data_paper_path = os.path.join(test_dir, "node_data_paper.npy")
np.save(node_data_paper_path, node_data_paper)
node_data_label = np.random.randint(0, 10, size=1000)
node_data_label = torch.tensor(
[[random.randint(0, 10)] for _ in range(1000)]
)
node_data_label_path = os.path.join(test_dir, "node_data_label.npy")
np.save(node_data_label_path, node_data_label)
......@@ -772,7 +775,9 @@ def test_OnDiskDataset_Feature_heterograph():
edge_data_writes = np.random.rand(1000, 10)
edge_data_writes_path = os.path.join(test_dir, "edge_writes_paper.npy")
np.save(edge_data_writes_path, edge_data_writes)
edge_data_label = np.random.randint(0, 10, size=1000)
edge_data_label = torch.tensor(
[[random.randint(0, 10)] for _ in range(1000)]
)
edge_data_label_path = os.path.join(test_dir, "edge_data_label.npy")
np.save(edge_data_label_path, edge_data_label)
......@@ -846,7 +851,9 @@ def test_OnDiskDataset_Feature_homograph():
node_data_feat = np.random.rand(1000, 10)
node_data_feat_path = os.path.join(test_dir, "node_data_feat.npy")
np.save(node_data_feat_path, node_data_feat)
node_data_label = np.random.randint(0, 10, size=1000)
node_data_label = torch.tensor(
[[random.randint(0, 10)] for _ in range(1000)]
)
node_data_label_path = os.path.join(test_dir, "node_data_label.npy")
np.save(node_data_label_path, node_data_label)
......@@ -854,7 +861,9 @@ def test_OnDiskDataset_Feature_homograph():
edge_data_feat = np.random.rand(1000, 10)
edge_data_feat_path = os.path.join(test_dir, "edge_data_feat.npy")
np.save(edge_data_feat_path, edge_data_feat)
edge_data_label = np.random.randint(0, 10, size=1000)
edge_data_label = torch.tensor(
[[random.randint(0, 10)] for _ in range(1000)]
)
edge_data_label_path = os.path.join(test_dir, "edge_data_label.npy")
np.save(edge_data_label_path, edge_data_label)
......
......@@ -22,8 +22,8 @@ def to_on_disk_tensor(test_dir, name, t):
@pytest.mark.parametrize("in_memory", [True, False])
def test_torch_based_feature(in_memory):
with tempfile.TemporaryDirectory() as test_dir:
a = torch.tensor([1, 2, 3])
b = torch.tensor([[1, 2, 3], [4, 5, 6]])
a = torch.tensor([[1, 2, 3], [4, 5, 6]])
b = torch.tensor([[[1, 2], [3, 4]], [[4, 5], [6, 7]]])
if not in_memory:
a = to_on_disk_tensor(test_dir, "a", a)
b = to_on_disk_tensor(test_dir, "b", b)
......@@ -31,26 +31,31 @@ def test_torch_based_feature(in_memory):
feature_a = gb.TorchBasedFeature(a)
feature_b = gb.TorchBasedFeature(b)
assert torch.equal(feature_a.read(), torch.tensor([1, 2, 3]))
# Read the entire feature.
assert torch.equal(
feature_b.read(), torch.tensor([[1, 2, 3], [4, 5, 6]])
feature_a.read(), torch.tensor([[1, 2, 3], [4, 5, 6]])
)
assert torch.equal(
feature_a.read(torch.tensor([0, 2])),
torch.tensor([1, 3]),
feature_b.read(), torch.tensor([[[1, 2], [3, 4]], [[4, 5], [6, 7]]])
)
# Read the feature with ids.
assert torch.equal(
feature_a.read(torch.tensor([1, 1])),
torch.tensor([2, 2]),
feature_a.read(torch.tensor([0])),
torch.tensor([[1, 2, 3]]),
)
assert torch.equal(
feature_b.read(torch.tensor([1])),
torch.tensor([[4, 5, 6]]),
torch.tensor([[[4, 5], [6, 7]]]),
)
# Update the feature with ids.
feature_a.update(torch.tensor([[0, 1, 2]]), torch.tensor([0]))
assert torch.equal(
feature_a.read(), torch.tensor([[0, 1, 2], [4, 5, 6]])
)
feature_b.update(torch.tensor([[[1, 2], [3, 4]]]), torch.tensor([1]))
assert torch.equal(
feature_b.read(), torch.tensor([[[1, 2], [3, 4]], [[1, 2], [3, 4]]])
)
feature_a.update(torch.tensor([0, 1, 2]), torch.tensor([0, 1, 2]))
assert torch.equal(feature_a.read(), torch.tensor([0, 1, 2]))
feature_a.update(torch.tensor([2, 0]), torch.tensor([0, 2]))
assert torch.equal(feature_a.read(), torch.tensor([2, 1, 0]))
with pytest.raises(IndexError):
feature_a.read(torch.tensor([0, 1, 2, 3]))
......@@ -74,8 +79,8 @@ def write_tensor_to_disk(dir, name, t, fmt="torch"):
@pytest.mark.parametrize("in_memory", [True, False])
def test_torch_based_feature_store(in_memory):
with tempfile.TemporaryDirectory() as test_dir:
a = torch.tensor([1, 2, 3])
b = torch.tensor([2, 5, 3])
a = torch.tensor([[1, 2, 4], [2, 5, 3]])
b = torch.tensor([[[1, 2], [3, 4]], [[2, 5], [3, 4]]])
write_tensor_to_disk(test_dir, "a", a, fmt="torch")
write_tensor_to_disk(test_dir, "b", b, fmt="numpy")
feature_data = [
......@@ -98,17 +103,18 @@ def test_torch_based_feature_store(in_memory):
]
feature_store = gb.TorchBasedFeatureStore(feature_data)
assert torch.equal(
feature_store.read("node", "paper", "a"), torch.tensor([1, 2, 3])
feature_store.read("node", "paper", "a"),
torch.tensor([[1, 2, 4], [2, 5, 3]]),
)
assert torch.equal(
feature_store.read("edge", "paper:cites:paper", "b"),
torch.tensor([2, 5, 3]),
torch.tensor([[[1, 2], [3, 4]], [[2, 5], [3, 4]]]),
)
# For windows, the file is locked by the numpy.load. We need to delete
# it before closing the temporary directory.
a = b = None
feature_stores = None
feature_store = None
# ``domain`` should be enum.
with pytest.raises(pydantic.ValidationError):
......@@ -133,6 +139,7 @@ def test_torch_based_feature_store(in_memory):
]
feature_store = gb.TorchBasedFeatureStore(feature_data)
assert torch.equal(
feature_store.read("node", None, "a"), torch.tensor([1, 2, 3])
feature_store.read("node", None, "a"),
torch.tensor([[1, 2, 4], [2, 5, 3]]),
)
feature_stores = None
feature_store = None
import random
import dgl.graphbolt as gb
import gb_test_utils
import torch
......@@ -7,8 +9,12 @@ from torchdata.datapipes.iter import Mapper
def test_FeatureFetcher_invoke():
# Prepare graph and required datapipes.
graph = gb_test_utils.rand_csc_graph(20, 0.15)
a = torch.randint(0, 10, (graph.total_num_nodes,))
b = torch.randint(0, 10, (graph.total_num_edges,))
a = torch.tensor(
[[random.randint(0, 10)] for _ in range(graph.total_num_nodes)]
)
b = torch.tensor(
[[random.randint(0, 10)] for _ in range(graph.total_num_edges)]
)
features = {}
keys = [("node", None, "a"), ("edge", None, "b")]
......@@ -35,8 +41,12 @@ def test_FeatureFetcher_invoke():
def test_FeatureFetcher_homo():
graph = gb_test_utils.rand_csc_graph(20, 0.15)
a = torch.randint(0, 10, (graph.total_num_nodes,))
b = torch.randint(0, 10, (graph.total_num_edges,))
a = torch.tensor(
[[random.randint(0, 10)] for _ in range(graph.total_num_nodes)]
)
b = torch.tensor(
[[random.randint(0, 10)] for _ in range(graph.total_num_edges)]
)
features = {}
keys = [("node", None, "a"), ("edge", None, "b")]
......@@ -56,8 +66,12 @@ def test_FeatureFetcher_homo():
def test_FeatureFetcher_with_edges_homo():
graph = gb_test_utils.rand_csc_graph(20, 0.15)
a = torch.randint(0, 10, (graph.total_num_nodes,))
b = torch.randint(0, 10, (graph.total_num_edges,))
a = torch.tensor(
[[random.randint(0, 10)] for _ in range(graph.total_num_nodes)]
)
b = torch.tensor(
[[random.randint(0, 10)] for _ in range(graph.total_num_edges)]
)
def add_node_and_edge_ids(seeds):
subgraphs = []
......@@ -116,8 +130,8 @@ def get_hetero_graph():
def test_FeatureFetcher_hetero():
graph = get_hetero_graph()
a = torch.randint(0, 10, (2,))
b = torch.randint(0, 10, (3,))
a = torch.tensor([[random.randint(0, 10)] for _ in range(2)])
b = torch.tensor([[random.randint(0, 10)] for _ in range(3)])
features = {}
keys = [("node", "n1", "a"), ("node", "n2", "a")]
......@@ -143,8 +157,8 @@ def test_FeatureFetcher_hetero():
def test_FeatureFetcher_with_edges_hetero():
a = torch.randint(0, 10, (20,))
b = torch.randint(0, 10, (50,))
a = torch.tensor([[random.randint(0, 10)] for _ in range(20)])
b = torch.tensor([[random.randint(0, 10)] for _ in range(50)])
def add_node_and_edge_ids(seeds):
subgraphs = []
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment