Unverified Commit a0e8cf0d authored by Quan (Andy) Gan's avatar Quan (Andy) Gan Committed by GitHub
Browse files

[Bugfix] Fix UVA sampling with lazy feature slicing (#3862)



* fix uva sampling with features

* fix

* add is_listlike function to distinguish strings from sequence

* fix
Co-authored-by: default avatarnv-dlasalle <63612878+nv-dlasalle@users.noreply.github.com>
parent 223a3da5
"""Base classes and functionalities for dataloaders"""
from collections import Mapping
from collections.abc import Mapping
import inspect
from ..base import NID, EID
from ..convert import heterograph
......
......@@ -21,7 +21,7 @@ from ..heterograph import DGLHeteroGraph
from .. import ndarray as nd
from ..utils import (
recursive_apply, ExceptionWrapper, recursive_apply_pair, set_num_threads,
create_shared_mem_array, get_shared_mem_array, context_of, pin_memory_inplace)
create_shared_mem_array, get_shared_mem_array, context_of)
from ..frame import LazyFeature
from ..storages import wrap_storage
from .base import BlockSampler, as_edge_prediction_sampler
......@@ -730,9 +730,6 @@ class DataLoader(torch.utils.data.DataLoader):
# will need to do that themselves.
self.graph.create_formats_()
self.graph.pin_memory_()
for frame in itertools.chain(self.graph._node_frames, self.graph._edge_frames):
for col in frame._columns.values():
pin_memory_inplace(col.data)
else:
if self.graph.device != indices_device:
raise ValueError(
......
......@@ -8,6 +8,7 @@ from . import backend as F
from .base import DGLError, dgl_warning
from .init import zero_initializer
from .storages import TensorStorage
from .utils import gather_pinned_tensor_rows, pin_memory_inplace, unpin_memory_inplace
class _LazyIndex(object):
def __init__(self, index):
......@@ -186,7 +187,7 @@ class Column(TensorStorage):
self.scheme = scheme if scheme else infer_scheme(storage)
self.index = index
self.device = device
self.pinned = False
self.pinned_by_dgl = False
def __len__(self):
"""The number of features (number of rows) in this column."""
......@@ -206,15 +207,23 @@ class Column(TensorStorage):
if self.index is not None:
if isinstance(self.index, _LazyIndex):
self.index = self.index.flatten()
# If index and storage is not in the same context,
# copy index to the same context of storage.
# Copy index is usually cheaper than copy data
if F.context(self.storage) != F.context(self.index):
kwargs = {}
if self.device is not None:
kwargs = self.device[1]
self.index = F.copy_to(self.index, F.context(self.storage), **kwargs)
self.storage = F.gather_row(self.storage, self.index)
storage_ctx = F.context(self.storage)
index_ctx = F.context(self.index)
# If under the special case where the storage is pinned and the index is on
# CUDA, directly call UVA slicing (even if they aree not in the same context).
if storage_ctx != index_ctx and storage_ctx == F.cpu() and F.is_pinned(self.storage):
self.storage = gather_pinned_tensor_rows(self.storage, self.index)
else:
# If index and storage is not in the same context,
# copy index to the same context of storage.
# Copy index is usually cheaper than copy data
if storage_ctx != index_ctx:
kwargs = {}
if self.device is not None:
kwargs = self.device[1]
self.index = F.copy_to(self.index, storage_ctx, **kwargs)
self.storage = F.gather_row(self.storage, self.index)
self.index = None
# move data to the right device
......@@ -228,7 +237,7 @@ class Column(TensorStorage):
"""Update the column data."""
self.index = None
self.storage = val
self.pinned = False
self.pinned_by_dgl = False
def to(self, device, **kwargs): # pylint: disable=invalid-name
""" Return a new column with columns copy to the targeted device (cpu/gpu).
......@@ -376,9 +385,28 @@ class Column(TensorStorage):
def __copy__(self):
return self.clone()
def fetch(self, indices, device, pin_memory=False):
def fetch(self, indices, device, pin_memory=False, **kwargs):
_ = self.data # materialize in case of lazy slicing & data transfer
return super().fetch(indices, device, pin_memory=False)
return super().fetch(indices, device, pin_memory=False, **kwargs)
def pin_memory_(self):
"""Pin the storage into page-locked memory.
Does nothing if the storage is already pinned.
"""
if not self.pinned_by_dgl and not F.is_pinned(self.data):
pin_memory_inplace(self.data)
self.pinned_by_dgl = True
def unpin_memory_(self):
"""Unpin the storage pinned by ``pin_memory_`` method.
Does nothing if the storage is not pinned by ``pin_memory_`` method, even if
it is actually in page-locked memory.
"""
if self.pinned_by_dgl:
unpin_memory_inplace(self.data)
self.pinned_by_dgl = False
class Frame(MutableMapping):
"""The columnar storage for node/edge features.
......
......@@ -5,6 +5,7 @@ from collections.abc import Mapping, Iterable
from contextlib import contextmanager
import copy
import numbers
import itertools
import networkx as nx
import numpy as np
......@@ -4121,6 +4122,15 @@ class DGLHeteroGraph(object):
raise DGLError('Cannot assign node feature "{}" on device {} to a graph on'
' device {}. Call DGLGraph.to() to copy the graph to the'
' same device.'.format(key, F.context(val), self.device))
# To prevent users from doing things like:
#
# g.pin_memory_()
# g.ndata['x'] = torch.randn(...)
# sg = g.sample_neighbors(torch.LongTensor([...]).cuda())
# sg.ndata['x'] # Becomes a CPU tensor even if sg is on GPU due to lazy slicing
if self.is_pinned() and F.context(val) == 'cpu' and not F.is_pinned(val):
raise DGLError('Pinned graph requires the node data to be pinned as well. '
'Please pin the node data before assignment.')
if is_all(u):
self._node_frames[ntid].update(data)
......@@ -4213,6 +4223,15 @@ class DGLHeteroGraph(object):
raise DGLError('Cannot assign edge feature "{}" on device {} to a graph on'
' device {}. Call DGLGraph.to() to copy the graph to the'
' same device.'.format(key, F.context(val), self.device))
# To prevent users from doing things like:
#
# g.pin_memory_()
# g.edata['x'] = torch.randn(...)
# sg = g.sample_neighbors(torch.LongTensor([...]).cuda())
# sg.edata['x'] # Becomes a CPU tensor even if sg is on GPU due to lazy slicing
if self.is_pinned() and F.context(val) == 'cpu' and not F.is_pinned(val):
raise DGLError('Pinned graph requires the edge data to be pinned as well. '
'Please pin the edge data before assignment.')
# set
if is_all(edges):
......@@ -5467,7 +5486,8 @@ class DGLHeteroGraph(object):
return self.to(F.cpu())
def pin_memory_(self):
"""Pin the graph structure to the page-locked memory for GPU zero-copy access.
"""Pin the graph structure and node/edge data to the page-locked memory for
GPU zero-copy access.
This is an **inplace** method. The graph structure must be on CPU to be pinned.
If the graph struture is already pinned, the function directly returns it.
......@@ -5530,13 +5550,16 @@ class DGLHeteroGraph(object):
if F.device_type(self.device) != 'cpu':
raise DGLError("The graph structure must be on CPU to be pinned.")
self._graph.pin_memory_()
for frame in itertools.chain(self._node_frames, self._edge_frames):
for col in frame._columns.values():
col.pin_memory_()
return self
def unpin_memory_(self):
"""Unpin the graph structure from the page-locked memory.
"""Unpin the graph structure and node/edge data from the page-locked memory.
This is an **inplace** method.If the graph struture is not pinned,
This is an **inplace** method. If the graph struture is not pinned,
e.g., on CPU or GPU, the function directly returns it.
Returns
......@@ -5547,6 +5570,9 @@ class DGLHeteroGraph(object):
if not self._graph.is_pinned():
return self
self._graph.unpin_memory_()
for frame in itertools.chain(self._node_frames, self._edge_frames):
for col in frame._columns.values():
col.unpin_memory_()
return self
......
......@@ -107,7 +107,9 @@ def sample_etype_neighbors(g, nodes, etype_field, fanout, edge_dir='in', prob=No
if isinstance(nodes, dict):
assert len(nodes) == 1, "The input graph should not have node types"
nodes = list(nodes.values())[0]
nodes = F.to_dgl_nd(utils.prepare_tensor(g, nodes, 'nodes'))
nodes = utils.prepare_tensor(g, nodes, 'nodes')
device = utils.context_of(nodes)
nodes = F.to_dgl_nd(nodes)
# treat etypes as int32, it is much cheaper than int64
# TODO(xiangsx): int8 can be a better choice.
etypes = F.to_dgl_nd(F.astype(g.edata[etype_field], ty=F.int32))
......@@ -135,7 +137,7 @@ def sample_etype_neighbors(g, nodes, etype_field, fanout, edge_dir='in', prob=No
# only set the edge IDs.
if not _dist_training:
if copy_ndata:
node_frames = utils.extract_node_subframes(g, None)
node_frames = utils.extract_node_subframes(g, device)
utils.set_new_frames(ret, node_frames=node_frames)
if copy_edata:
......@@ -313,7 +315,8 @@ def _sample_neighbors(g, nodes, fanout, edge_dir='in', prob=None, replace=False,
raise ValueError(
"Got an empty dictionary in the nodes argument. "
"Please pass in a dictionary with empty tensors as values instead.")
ctx = utils.to_dgl_context(F.context(next(iter(nodes.values()))))
device = utils.context_of(nodes)
ctx = utils.to_dgl_context(device)
nodes_all_types = []
for ntype in g.ntypes:
if ntype in nodes:
......@@ -373,7 +376,7 @@ def _sample_neighbors(g, nodes, fanout, edge_dir='in', prob=None, replace=False,
# only set the edge IDs.
if not _dist_training:
if copy_ndata:
node_frames = utils.extract_node_subframes(g, None)
node_frames = utils.extract_node_subframes(g, device)
utils.set_new_frames(ret, node_frames=node_frames)
if copy_edata:
......@@ -531,6 +534,7 @@ def sample_neighbors_biased(g, nodes, fanout, bias, edge_dir='in',
nodes = F.tensor(nodes)
if isinstance(bias, list):
bias = F.tensor(bias)
device = utils.context_of(nodes)
nodes_array = F.to_dgl_nd(nodes)
bias_array = F.to_dgl_nd(bias)
......@@ -547,7 +551,7 @@ def sample_neighbors_biased(g, nodes, fanout, bias, edge_dir='in',
ret = DGLHeteroGraph(subgidx.graph, g.ntypes, g.etypes)
if copy_ndata:
node_frames = utils.extract_node_subframes(g, None)
node_frames = utils.extract_node_subframes(g, device)
utils.set_new_frames(ret, node_frames=node_frames)
if copy_edata:
......@@ -651,6 +655,7 @@ def select_topk(g, k, weight, nodes=None, edge_dir='in', ascending=False,
# Parse nodes into a list of NDArrays.
nodes = utils.prepare_tensor_dict(g, nodes, 'nodes')
device = utils.context_of(nodes)
nodes_all_types = []
for ntype in g.ntypes:
if ntype in nodes:
......@@ -684,7 +689,7 @@ def select_topk(g, k, weight, nodes=None, edge_dir='in', ascending=False,
# handle features
if copy_ndata:
node_frames = utils.extract_node_subframes(g, None)
node_frames = utils.extract_node_subframes(g, device)
utils.set_new_frames(ret, node_frames=node_frames)
if copy_edata:
......
......@@ -62,7 +62,7 @@ class FeatureStorage(object):
"""
return False
def fetch(self, indices, device, pin_memory=False):
def fetch(self, indices, device, pin_memory=False, **kwargs):
"""Retrieve the features at the given indices.
If :attr:`indices` is a tensor, this is equivalent to
......
......@@ -9,10 +9,12 @@ class NumpyStorage(FeatureStorage):
def __init__(self, arr):
self.arr = arr
def _fetch(self, indices, device, pin_memory=False): # pylint: disable=unused-argument
# pylint: disable=unused-argument
def _fetch(self, indices, device, pin_memory=False):
result = F.zerocopy_from_numpy(self.arr[indices])
result = F.copy_to(result, device)
return result
def fetch(self, indices, device, pin_memory=False):
# pylint: disable=unused-argument
def fetch(self, indices, device, pin_memory=False, **kwargs):
return ThreadedFuture(target=self._fetch, args=(indices, device, pin_memory))
......@@ -5,21 +5,22 @@ from .base import register_storage_wrapper
from .tensor import BaseTensorStorage
from ..utils import gather_pinned_tensor_rows
def _fetch_cpu(indices, tensor, feature_shape, device, pin_memory):
def _fetch_cpu(indices, tensor, feature_shape, device, pin_memory, **kwargs):
result = torch.empty(
indices.shape[0], *feature_shape, dtype=tensor.dtype,
pin_memory=pin_memory)
torch.index_select(tensor, 0, indices, out=result)
result = result.to(device, non_blocking=True)
kwargs['non_blocking'] = pin_memory
result = result.to(device, **kwargs)
return result
def _fetch_cuda(indices, tensor, device):
return torch.index_select(tensor, 0, indices).to(device)
def _fetch_cuda(indices, tensor, device, **kwargs):
return torch.index_select(tensor, 0, indices).to(device, **kwargs)
@register_storage_wrapper(torch.Tensor)
class PyTorchTensorStorage(BaseTensorStorage):
"""Feature storages for slicing a PyTorch tensor."""
def fetch(self, indices, device, pin_memory=False):
def fetch(self, indices, device, pin_memory=False, **kwargs):
device = torch.device(device)
storage_device_type = self.storage.device.type
indices_device_type = indices.device.type
......@@ -36,7 +37,7 @@ class PyTorchTensorStorage(BaseTensorStorage):
# CPU to CPU or CUDA - use pin_memory and async transfer if possible
else:
return _fetch_cpu(indices, self.storage, self.storage.shape[1:], device,
pin_memory)
pin_memory, **kwargs)
else:
# CUDA to CUDA or CPU
return _fetch_cuda(indices, self.storage, device)
return _fetch_cuda(indices, self.storage, device, **kwargs)
......@@ -9,5 +9,5 @@ class BaseTensorStorage(FeatureStorage):
def __init__(self, tensor):
self.storage = tensor
def fetch(self, indices, device, pin_memory=False): # pylint: disable=unused-argument
return F.copy_to(F.gather_row(tensor, indices), device)
def fetch(self, indices, device, pin_memory=False, **kwargs): # pylint: disable=unused-argument
return F.copy_to(F.gather_row(tensor, indices), device, **kwargs)
......@@ -13,7 +13,7 @@ from . import heterograph_index
from . import ndarray as nd
from .heterograph import DGLHeteroGraph
from . import utils
from .utils import recursive_apply
from .utils import recursive_apply, context_of
__all__ = ['node_subgraph', 'edge_subgraph', 'node_type_subgraph', 'edge_type_subgraph',
'in_subgraph', 'out_subgraph', 'khop_in_subgraph', 'khop_out_subgraph']
......@@ -147,13 +147,14 @@ def node_subgraph(graph, nodes, *, relabel_nodes=True, store_ids=True, output_de
for ntype in graph.ntypes:
nids = nodes.get(ntype, F.copy_to(F.tensor([], graph.idtype), graph.device))
induced_nodes.append(_process_nodes(ntype, nids))
device = context_of(induced_nodes)
sgi = graph._graph.node_subgraph(induced_nodes, relabel_nodes)
induced_edges = sgi.induced_edges
# (BarclayII) should not write induced_nodes = sgi.induced_nodes due to the same
# bug in #1453.
if not relabel_nodes:
induced_nodes = None
subg = _create_hetero_subgraph(graph, sgi, induced_nodes, induced_edges, store_ids=store_ids)
induced_nodes_or_device = induced_nodes if relabel_nodes else device
subg = _create_hetero_subgraph(
graph, sgi, induced_nodes_or_device, induced_edges, store_ids=store_ids)
return subg if output_device is None else subg.to(output_device)
DGLHeteroGraph.subgraph = utils.alias_func(node_subgraph)
......@@ -306,9 +307,11 @@ def edge_subgraph(graph, edges, *, relabel_nodes=True, store_ids=True, output_de
for cetype in graph.canonical_etypes:
eids = edges.get(cetype, F.copy_to(F.tensor([], graph.idtype), graph.device))
induced_edges.append(_process_edges(cetype, eids))
device = context_of(induced_edges)
sgi = graph._graph.edge_subgraph(induced_edges, not relabel_nodes)
induced_nodes = sgi.induced_nodes if relabel_nodes else None
subg = _create_hetero_subgraph(graph, sgi, induced_nodes, induced_edges, store_ids=store_ids)
induced_nodes_or_device = sgi.induced_nodes if relabel_nodes else device
subg = _create_hetero_subgraph(
graph, sgi, induced_nodes_or_device, induced_edges, store_ids=store_ids)
return subg if output_device is None else subg.to(output_device)
DGLHeteroGraph.edge_subgraph = utils.alias_func(edge_subgraph)
......@@ -426,6 +429,7 @@ def in_subgraph(graph, nodes, *, relabel_nodes=False, store_ids=True, output_dev
raise DGLError("Must specify node type when the graph is not homogeneous.")
nodes = {graph.ntypes[0] : nodes}
nodes = utils.prepare_tensor_dict(graph, nodes, 'nodes')
device = context_of(nodes)
nodes_all_types = []
for ntype in graph.ntypes:
if ntype in nodes:
......@@ -434,9 +438,10 @@ def in_subgraph(graph, nodes, *, relabel_nodes=False, store_ids=True, output_dev
nodes_all_types.append(nd.NULL[graph._idtype_str])
sgi = _CAPI_DGLInSubgraph(graph._graph, nodes_all_types, relabel_nodes)
induced_nodes = sgi.induced_nodes if relabel_nodes else None
induced_nodes_or_device = sgi.induced_nodes if relabel_nodes else device
induced_edges = sgi.induced_edges
subg = _create_hetero_subgraph(graph, sgi, induced_nodes, induced_edges, store_ids=store_ids)
subg = _create_hetero_subgraph(
graph, sgi, induced_nodes_or_device, induced_edges, store_ids=store_ids)
return subg if output_device is None else subg.to(output_device)
DGLHeteroGraph.in_subgraph = utils.alias_func(in_subgraph)
......@@ -554,6 +559,7 @@ def out_subgraph(graph, nodes, *, relabel_nodes=False, store_ids=True, output_de
raise DGLError("Must specify node type when the graph is not homogeneous.")
nodes = {graph.ntypes[0] : nodes}
nodes = utils.prepare_tensor_dict(graph, nodes, 'nodes')
device = context_of(nodes)
nodes_all_types = []
for ntype in graph.ntypes:
if ntype in nodes:
......@@ -562,9 +568,10 @@ def out_subgraph(graph, nodes, *, relabel_nodes=False, store_ids=True, output_de
nodes_all_types.append(nd.NULL[graph._idtype_str])
sgi = _CAPI_DGLOutSubgraph(graph._graph, nodes_all_types, relabel_nodes)
induced_nodes = sgi.induced_nodes if relabel_nodes else None
induced_nodes_or_device = sgi.induced_nodes if relabel_nodes else device
induced_edges = sgi.induced_edges
subg = _create_hetero_subgraph(graph, sgi, induced_nodes, induced_edges, store_ids=store_ids)
subg = _create_hetero_subgraph(
graph, sgi, induced_nodes_or_device, induced_edges, store_ids=store_ids)
return subg if output_device is None else subg.to(output_device)
DGLHeteroGraph.out_subgraph = utils.alias_func(out_subgraph)
......@@ -1067,7 +1074,8 @@ DGLHeteroGraph.edge_type_subgraph = utils.alias_func(edge_type_subgraph)
#################### Internal functions ####################
def _create_hetero_subgraph(parent, sgi, induced_nodes, induced_edges, store_ids=True):
def _create_hetero_subgraph(parent, sgi, induced_nodes_or_device, induced_edges_or_device,
store_ids=True):
"""Internal function to create a subgraph.
Parameters
......@@ -1076,12 +1084,14 @@ def _create_hetero_subgraph(parent, sgi, induced_nodes, induced_edges, store_ids
The parent DGLGraph.
sgi : HeteroSubgraphIndex
Subgraph object returned by CAPI.
induced_nodes : list[Tensor] or None
Induced node IDs. Will store it as the dgl.NID ndata unless it
induced_nodes_or_device : list[Tensor] or device or None
Induced node IDs or the device. Will store it as the dgl.NID ndata unless it
is None, which means the induced node IDs are the same as the parent node IDs.
induced_edges : list[Tensor] or None
If a device is given, the features will be copied to the given device.
induced_edges_or_device : list[Tensor] or device or None
Induced edge IDs. Will store it as the dgl.EID ndata unless it
is None, which means the induced edge IDs are the same as the parent edge IDs.
If a device is given, the features will be copied to the given device.
store_ids : bool
If True and induced_nodes is not None, it will store the raw IDs of the extracted
nodes in the ``ndata`` of the resulting graph under name ``dgl.NID``.
......@@ -1093,8 +1103,12 @@ def _create_hetero_subgraph(parent, sgi, induced_nodes, induced_edges, store_ids
DGLGraph
Graph
"""
node_frames = utils.extract_node_subframes(parent, induced_nodes, store_ids)
edge_frames = utils.extract_edge_subframes(parent, induced_edges, store_ids)
# (BarclayII) Giving a device argument to induced_nodes_or_device is necessary for
# UVA subgraphing, where the node features are not sliced but the device changed.
# Not having this will give us a subgraph on GPU but node features on CPU if we don't
# relabel the nodes.
node_frames = utils.extract_node_subframes(parent, induced_nodes_or_device, store_ids)
edge_frames = utils.extract_edge_subframes(parent, induced_edges_or_device, store_ids)
hsg = DGLHeteroGraph(sgi.graph, parent.ntypes, parent.etypes)
utils.set_new_frames(hsg, node_frames=node_frames, edge_frames=edge_frames)
return hsg
......
......@@ -11,6 +11,10 @@ from .. import backend as F
from .. import ndarray as nd
from .._ffi.function import _init_api
def is_listlike(data):
"""Return if the data is a sequence but not a string."""
return isinstance(data, Sequence) and not isinstance(data, str)
class InconsistentDtypeException(DGLError):
"""Exception class for inconsistent dtype between graph and tensor"""
def __init__(self, msg='', *args, **kwargs): #pylint: disable=W1113
......@@ -759,9 +763,9 @@ def relabel(x):
F.copy_to(F.arange(0, len(unique_x), dtype), ctx))
return unique_x, old_to_new
def extract_node_subframes(graph, nodes, store_ids=True):
def extract_node_subframes(graph, nodes_or_device, store_ids=True):
"""Extract node features of the given nodes from :attr:`graph`
and return them in frames.
and return them in frames on the given device.
Note that this function does not perform actual tensor memory copy but using `Frame.subframe`
to get the features. If :attr:`nodes` is None, it performs a shallow copy of the
......@@ -772,9 +776,11 @@ def extract_node_subframes(graph, nodes, store_ids=True):
----------
graph : DGLGraph
The graph to extract features from.
nodes : list[Tensor] or None
Node IDs. If not None, the list length must be equal to the number of node types
in the graph. If None, the whole frame is shallow-copied.
nodes : list[Tensor] or device or None
Node IDs or device.
If a list, the list length must be equal to the number of node types
in the graph.
If None, the whole frame is shallow-copied.
store_ids : bool
If True, the returned frames will store :attr:`nodes` in the ``dgl.NID`` field
unless it is None.
......@@ -784,15 +790,17 @@ def extract_node_subframes(graph, nodes, store_ids=True):
list[Frame]
Extracted node frames.
"""
if nodes is None:
if nodes_or_device is None:
node_frames = [nf.clone() for nf in graph._node_frames]
else:
elif is_listlike(nodes_or_device):
node_frames = []
for i, ind_nodes in enumerate(nodes):
for i, ind_nodes in enumerate(nodes_or_device):
subf = graph._node_frames[i].subframe(ind_nodes)
if store_ids:
subf[NID] = ind_nodes
node_frames.append(subf)
else: # device object
node_frames = [nf.to(nodes_or_device) for nf in graph._node_frames]
return node_frames
def extract_node_subframes_for_block(graph, srcnodes, dstnodes):
......@@ -831,7 +839,7 @@ def extract_node_subframes_for_block(graph, srcnodes, dstnodes):
node_frames.append(subf)
return node_frames
def extract_edge_subframes(graph, edges, store_ids=True):
def extract_edge_subframes(graph, edges_or_device, store_ids=True):
"""Extract edge features of the given edges from :attr:`graph`
and return them in frames.
......@@ -844,9 +852,11 @@ def extract_edge_subframes(graph, edges, store_ids=True):
----------
graph : DGLGraph
The graph to extract features from.
edges : list[Tensor] or None
Edge IDs. If not None, the list length must be equal to the number of edge types
in the graph. If None, the whole frame is shallow-copied.
edges_or_device : list[Tensor] or device or None
Edge IDs.
If a list, the list length must be equal to the number of edge types
in the graph.
If None, the whole frame is shallow-copied.
store_ids : bool
If True, the returned frames will store :attr:`edges` in the ``dgl.EID`` field
unless it is None.
......@@ -856,15 +866,17 @@ def extract_edge_subframes(graph, edges, store_ids=True):
list[Frame]
Extracted edge frames.
"""
if edges is None:
if edges_or_device is None:
edge_frames = [nf.clone() for nf in graph._edge_frames]
else:
elif is_listlike(edges_or_device):
edge_frames = []
for i, ind_edges in enumerate(edges):
for i, ind_edges in enumerate(edges_or_device):
subf = graph._edge_frames[i].subframe(ind_edges)
if store_ids:
subf[EID] = ind_edges
edge_frames.append(subf)
else: # device object
edge_frames = [nf.to(device) for nf in graph._edge_frames]
return edge_frames
def set_new_frames(graph, *, node_frames=None, edge_frames=None):
......@@ -941,7 +953,7 @@ def apply_each(data, fn, *args, **kwargs):
"""
if isinstance(data, Mapping):
return {k: fn(v, *args, **kwargs) for k, v in data.items()}
elif isinstance(data, Sequence):
elif is_listlike(data):
return [fn(v, *args, **kwargs) for v in data]
else:
return fn(data, *args, **kwargs)
......@@ -978,11 +990,9 @@ def recursive_apply(data, fn, *args, **kwargs):
>>> h = recursive_apply(h, torch.nn.functional.relu)
>>> assert all((v >= 0).all() for v in h.values())
"""
if isinstance(data, str): # str is a Sequence
return fn(data, *args, **kwargs)
elif isinstance(data, Mapping):
if isinstance(data, Mapping):
return {k: recursive_apply(v, fn, *args, **kwargs) for k, v in data.items()}
elif isinstance(data, Sequence):
elif is_listlike(data):
return [recursive_apply(v, fn, *args, **kwargs) for v in data]
else:
return fn(data, *args, **kwargs)
......@@ -991,19 +1001,22 @@ def recursive_apply_pair(data1, data2, fn, *args, **kwargs):
"""Recursively apply a function to every pair of elements in two containers with the
same nested structure.
"""
if isinstance(data1, str) or isinstance(data2, str):
return fn(data1, data2, *args, **kwargs)
elif isinstance(data1, Mapping) and isinstance(data2, Mapping):
if isinstance(data1, Mapping) and isinstance(data2, Mapping):
return {
k: recursive_apply_pair(data1[k], data2[k], fn, *args, **kwargs)
for k in data1.keys()}
elif isinstance(data1, Sequence) and isinstance(data2, Sequence):
elif is_listlike(data1) and is_listlike(data2):
return [recursive_apply_pair(x, y, fn, *args, **kwargs) for x, y in zip(data1, data2)]
else:
return fn(data1, data2, *args, **kwargs)
def context_of(data):
"""Return the device of the data which can be either a tensor or a dict of tensors."""
return F.context(next(iter(data.values())) if isinstance(data, Mapping) else data)
"""Return the device of the data which can be either a tensor or a list/dict of tensors."""
if isinstance(data, Mapping):
return F.context(next(iter(data.values())))
elif is_listlike(data):
return F.context(next(iter(data)))
else:
return F.context(data)
_init_api("dgl.utils.internal")
......@@ -2,6 +2,7 @@ import numpy as np
import networkx as nx
import unittest
import scipy.sparse as ssp
import pytest
import dgl
import backend as F
......@@ -595,5 +596,39 @@ def test_khop_out_subgraph(idtype):
assert F.array_equal(F.astype(inv['user'], idtype), F.tensor([0], idtype))
assert F.array_equal(F.astype(inv['game'], idtype), F.tensor([0], idtype))
@unittest.skipIf(not F.gpu_ctx(), 'only necessary with GPU')
@unittest.skipIf(dgl.backend.backend_name != "pytorch", reason="UVA only supported for PyTorch")
@pytest.mark.parametrize(
'parent_idx_device', [('cpu', F.cpu()), ('cuda', F.cuda()), ('uva', F.cpu()), ('uva', F.cuda())])
@pytest.mark.parametrize('child_device', [F.cpu(), F.cuda()])
def test_subframes(parent_idx_device, child_device):
parent_device, idx_device = parent_idx_device
g = dgl.graph((F.tensor([1,2,3], dtype=F.int64), F.tensor([2,3,4], dtype=F.int64)))
print(g.device)
g.ndata['x'] = F.randn((5, 4))
g.edata['a'] = F.randn((3, 6))
idx = F.tensor([1, 2], dtype=F.int64)
if parent_device == 'cuda':
g = g.to(F.cuda())
elif parent_device == 'uva':
g = g.to(F.cpu())
g.create_formats_()
g.pin_memory_()
elif parent_device == 'cpu':
g = g.to(F.cpu())
idx = F.copy_to(idx, idx_device)
sg = g.sample_neighbors(idx, 2).to(child_device)
assert sg.device == sg.ndata['x'].device
assert sg.device == sg.edata['a'].device
assert sg.device == child_device
if parent_device != 'uva':
sg = g.to(child_device).sample_neighbors(F.copy_to(idx, child_device), 2)
assert sg.device == sg.ndata['x'].device
assert sg.device == sg.edata['a'].device
assert sg.device == child_device
if parent_device == 'uva':
g.unpin_memory_()
if __name__ == '__main__':
test_khop_out_subgraph(F.int64)
test_subframes(('cpu', F.cpu()), F.cuda())
......@@ -101,8 +101,7 @@ def _check_device(data):
assert data.device == F.ctx()
@pytest.mark.parametrize('sampler_name', ['full', 'neighbor', 'neighbor2'])
# TODO(BarclayII): Re-enable pin_graph = True after PyTorch is upgraded to 1.9.0 on CI
@pytest.mark.parametrize('pin_graph', [False])
@pytest.mark.parametrize('pin_graph', [False, True])
def test_node_dataloader(sampler_name, pin_graph):
g1 = dgl.graph(([0, 0, 0, 1, 1], [1, 2, 3, 3, 4]))
if F.ctx() != F.cpu() and pin_graph:
......@@ -156,8 +155,7 @@ def test_node_dataloader(sampler_name, pin_graph):
dgl.dataloading.negative_sampler.Uniform(2),
dgl.dataloading.negative_sampler.GlobalUniform(15, False, 3),
dgl.dataloading.negative_sampler.GlobalUniform(15, True, 3)])
# TODO(BarclayII): Re-enable pin_graph = True after PyTorch is upgraded to 1.9.0 on CI
@pytest.mark.parametrize('pin_graph', [False])
@pytest.mark.parametrize('pin_graph', [False, True])
def test_edge_dataloader(sampler_name, neg_sampler, pin_graph):
g1 = dgl.graph(([0, 0, 0, 1, 1], [1, 2, 3, 3, 4]))
if F.ctx() != F.cpu() and pin_graph:
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment