Unverified Commit 41a38486 authored by Xinyu Yao's avatar Xinyu Yao Committed by GitHub
Browse files

[GraphBolt] Remove old version `to` in `MiniBatch`. (#7309)


Co-authored-by: default avatarUbuntu <ubuntu@ip-172-31-0-133.us-west-2.compute.internal>
parent bc3da371
......@@ -562,49 +562,14 @@ class MiniBatch:
def apply_to(x, device):
return recursive_apply(x, lambda x: _to(x, device))
if self.seed_nodes is not None and self.compacted_node_pairs is None:
# Node related tasks.
transfer_attrs = [
"labels",
"sampled_subgraphs",
"node_features",
"edge_features",
]
if self.labels is None:
# Layerwise inference
transfer_attrs.append("seed_nodes")
elif self.seed_nodes is None and self.compacted_node_pairs is not None:
# Link/edge related tasks.
transfer_attrs = [
"labels",
"compacted_node_pairs",
"compacted_negative_srcs",
"compacted_negative_dsts",
"sampled_subgraphs",
"node_features",
"edge_features",
]
elif self.seeds is not None:
# Node/link/edge related tasks.
transfer_attrs = [
"labels",
"sampled_subgraphs",
"node_features",
"edge_features",
"compacted_seeds",
"indexes",
"seeds",
]
else:
# Otherwise copy all the attributes to the device.
transfer_attrs = get_attributes(self)
for attr in transfer_attrs:
# Only copy member variables.
try:
# For read-only attributes such as blocks and
# node_pairs_with_labels, setattr will throw an AttributeError.
# We catch these exceptions and skip those attributes.
# For read-only attributes such as blocks , setattr will throw
# an AttributeError. We catch these exceptions and skip those
# attributes.
setattr(self, attr, apply_to(getattr(self, attr), device))
except AttributeError:
continue
......
......@@ -29,132 +29,6 @@ def test_CopyTo():
assert data.seeds.device.type == "cuda"
@pytest.mark.parametrize(
"task",
[
"node_classification",
"node_inference",
"link_prediction",
"edge_classification",
"extra_attrs",
],
)
@unittest.skipIf(F._default_context_str == "cpu", "CopyTo needs GPU to test")
def test_CopyToWithMiniBatches_original(task):
N = 16
B = 2
if task == "node_classification" or task == "extra_attrs":
itemset = gb.ItemSet(
(torch.arange(N), torch.arange(N)), names=("seed_nodes", "labels")
)
elif task == "node_inference":
itemset = gb.ItemSet(torch.arange(N), names="seed_nodes")
elif task == "link_prediction":
itemset = gb.ItemSet(
(
torch.arange(2 * N).reshape(-1, 2),
torch.arange(3 * N).reshape(-1, 3),
),
names=("node_pairs", "negative_dsts"),
)
elif task == "edge_classification":
itemset = gb.ItemSet(
(torch.arange(2 * N).reshape(-1, 2), torch.arange(N)),
names=("node_pairs", "labels"),
)
graph = gb_test_utils.rand_csc_graph(100, 0.15, bidirection_edge=True)
features = {}
keys = [("node", None, "a"), ("node", None, "b")]
features[keys[0]] = gb.TorchBasedFeature(torch.randn(200, 4))
features[keys[1]] = gb.TorchBasedFeature(torch.randn(200, 4))
feature_store = gb.BasicFeatureStore(features)
datapipe = gb.ItemSampler(itemset, batch_size=B)
datapipe = gb.NeighborSampler(
datapipe,
graph,
fanouts=[torch.LongTensor([2]) for _ in range(2)],
)
if task != "node_inference":
datapipe = gb.FeatureFetcher(
datapipe,
feature_store,
["a"],
)
if task == "node_classification":
copied_attrs = [
"node_features",
"edge_features",
"sampled_subgraphs",
"labels",
"blocks",
"seeds",
]
elif task == "node_inference":
copied_attrs = [
"seeds",
"sampled_subgraphs",
"blocks",
"labels",
]
elif task == "link_prediction" or task == "edge_classification":
copied_attrs = [
"labels",
"compacted_seeds",
"sampled_subgraphs",
"indexes",
"node_features",
"edge_features",
"blocks",
"seeds",
]
elif task == "extra_attrs":
copied_attrs = [
"node_features",
"edge_features",
"sampled_subgraphs",
"labels",
"blocks",
"seed_nodes",
"seeds",
]
def test_data_device(datapipe):
for data in datapipe:
for attr in dir(data):
var = getattr(data, attr)
if isinstance(var, Mapping):
var = var[next(iter(var))]
elif isinstance(var, Iterable):
var = next(iter(var))
if (
not callable(var)
and not attr.startswith("__")
and hasattr(var, "device")
and var is not None
):
if task == "other":
assert var.device.type == "cuda"
else:
if attr in copied_attrs:
assert var.device.type == "cuda"
else:
assert var.device.type == "cpu"
if task == "extra_attrs":
extra_attrs = ["seed_nodes"]
else:
extra_attrs = None
# Invoke CopyTo via class constructor.
test_data_device(gb.CopyTo(datapipe, "cuda", extra_attrs))
# Invoke CopyTo via functional form.
test_data_device(datapipe.copy_to("cuda", extra_attrs))
@pytest.mark.parametrize(
"task",
[
......@@ -209,23 +83,6 @@ def test_CopyToWithMiniBatches(task):
["a"],
)
if task == "node_classification":
copied_attrs = [
"node_features",
"edge_features",
"sampled_subgraphs",
"labels",
"blocks",
"seeds",
]
elif task == "node_inference":
copied_attrs = [
"seeds",
"sampled_subgraphs",
"blocks",
"labels",
]
elif task == "link_prediction" or task == "edge_classification":
copied_attrs = [
"labels",
"compacted_seeds",
......@@ -235,21 +92,11 @@ def test_CopyToWithMiniBatches(task):
"edge_features",
"blocks",
"seeds",
]
elif task == "extra_attrs":
copied_attrs = [
"node_features",
"edge_features",
"sampled_subgraphs",
"labels",
"blocks",
"seed_nodes",
"seeds",
"input_nodes",
]
def test_data_device(datapipe):
for data in datapipe:
print(data)
for attr in dir(data):
var = getattr(data, attr)
if isinstance(var, Mapping):
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment