Unverified Commit 76bb5404 authored by Hongzhi (Steve), Chen's avatar Hongzhi (Steve), Chen Committed by GitHub
Browse files

[Misc] Black auto fix. (#4682)


Co-authored-by: default avatarSteve <ubuntu@ip-172-31-34-29.ap-northeast-1.compute.internal>
parent a208e886
"""dgl sddmm operator module."""
from itertools import product
import sys
from itertools import product
from .. import backend as F
from ..backend import gsddmm as gsddmm_internal
from ..backend import gsddmm_hetero as gsddmm_internal_hetero
from .. import backend as F
__all__ = ['gsddmm', 'copy_u', 'copy_v', 'copy_e']
__all__ = ["gsddmm", "copy_u", "copy_v", "copy_e"]
def reshape_lhs_rhs(lhs_data, rhs_data):
r""" Expand dims so that there will be no broadcasting issues with different
r"""Expand dims so that there will be no broadcasting issues with different
number of dimensions. For example, given two shapes (N, 3, 1), (E, 5, 3, 4)
that are valid broadcastable shapes, change them to (N, 1, 3, 1) and
(E, 5, 3, 4)
......@@ -33,8 +34,9 @@ def reshape_lhs_rhs(lhs_data, rhs_data):
rhs_data = F.reshape(rhs_data, new_rhs_shape)
return lhs_data, rhs_data
def gsddmm(g, op, lhs_data, rhs_data, lhs_target='u', rhs_target='v'):
r""" Generalized Sampled-Dense-Dense Matrix Multiplication interface.
def gsddmm(g, op, lhs_data, rhs_data, lhs_target="u", rhs_target="v"):
r"""Generalized Sampled-Dense-Dense Matrix Multiplication interface.
It computes edge features by :attr:`op` lhs features and rhs features.
.. math::
......@@ -69,30 +71,34 @@ def gsddmm(g, op, lhs_data, rhs_data, lhs_target='u', rhs_target='v'):
The result tensor.
"""
if g._graph.number_of_etypes() == 1:
if op not in ['copy_lhs', 'copy_rhs']:
if op not in ["copy_lhs", "copy_rhs"]:
lhs_data, rhs_data = reshape_lhs_rhs(lhs_data, rhs_data)
return gsddmm_internal(
g._graph, op, lhs_data, rhs_data, lhs_target, rhs_target)
g._graph, op, lhs_data, rhs_data, lhs_target, rhs_target
)
else:
if op == 'copy_lhs':
if op == "copy_lhs":
rhs_data = [None] * g._graph.number_of_etypes()
elif op == 'copy_rhs':
elif op == "copy_rhs":
lhs_data = [None] * g._graph.number_of_ntypes()
# TODO (Israt): Call reshape_lhs_rhs() on lhs and rhs data to match their dimension
# and avoid broadcasting issue. Handle the case where different nodes have
# different dimensions, and different etypes may need different broadcasting
# dims for the same node.
lhs_and_rhs_tuple = tuple(list(lhs_data) + list(rhs_data))
return gsddmm_internal_hetero(g._graph, op, len(lhs_data), lhs_target,
rhs_target, *lhs_and_rhs_tuple)
return gsddmm_internal_hetero(
g._graph,
op,
len(lhs_data),
lhs_target,
rhs_target,
*lhs_and_rhs_tuple
)
def _gen_sddmm_func(lhs_target, rhs_target, binary_op):
name = "{}_{}_{}".format(lhs_target, binary_op, rhs_target)
target_dict = {
'u': "source node",
'e': "edge",
'v': "destination node"
}
target_dict = {"u": "source node", "e": "edge", "v": "destination node"}
lhs_str = target_dict[lhs_target]
rhs_str = target_dict[rhs_target]
docstring = r"""Generalized SDDMM function.
......@@ -121,11 +127,15 @@ def _gen_sddmm_func(lhs_target, rhs_target, binary_op):
Broadcasting follows NumPy semantics. Please see
https://docs.scipy.org/doc/numpy/user/basics.broadcasting.html
for more details about the NumPy broadcasting semantics.
""".format(op=binary_op, lhs=lhs_str, rhs=rhs_str)
""".format(
op=binary_op, lhs=lhs_str, rhs=rhs_str
)
def func(g, x, y):
return gsddmm(g, binary_op, x, y,
lhs_target=lhs_target, rhs_target=rhs_target)
return gsddmm(
g, binary_op, x, y, lhs_target=lhs_target, rhs_target=rhs_target
)
func.__name__ = name
func.__doc__ = docstring
return func
......@@ -161,7 +171,7 @@ def copy_u(g, x):
-----
This function supports autograd (computing input gradients given the output gradient).
"""
return gsddmm(g, 'copy_lhs', x, None)
return gsddmm(g, "copy_lhs", x, None)
def copy_v(g, x):
......@@ -183,7 +193,7 @@ def copy_v(g, x):
-----
This function supports autograd (computing input gradients given the output gradient).
"""
return gsddmm(g, 'copy_rhs', None, x)
return gsddmm(g, "copy_rhs", None, x)
# pylint: disable=unused-argument
......
"""Segment aggregation operators implemented using DGL graph."""
from ..base import DGLError
from .. import backend as F
from ..base import DGLError
__all__ = ['segment_reduce', 'segment_softmax', 'segment_mm']
__all__ = ["segment_reduce", "segment_softmax", "segment_mm"]
def segment_reduce(seglen, value, reducer='sum'):
def segment_reduce(seglen, value, reducer="sum"):
"""Segment reduction operator.
It aggregates the value tensor along the first dimension by segments.
......@@ -41,16 +42,17 @@ def segment_reduce(seglen, value, reducer='sum'):
[4., 4., 4.]])
"""
offsets = F.cumsum(
F.cat([F.zeros((1,), F.dtype(seglen), F.context(seglen)), seglen], 0), 0)
if reducer == 'mean':
rst = F.segment_reduce('sum', value, offsets)
F.cat([F.zeros((1,), F.dtype(seglen), F.context(seglen)), seglen], 0), 0
)
if reducer == "mean":
rst = F.segment_reduce("sum", value, offsets)
rst_shape = F.shape(rst)
z = F.astype(F.clamp(seglen, 1, len(value)), F.dtype(rst))
z_shape = (rst_shape[0],) + (1,) * (len(rst_shape) - 1)
return rst / F.reshape(z, z_shape)
elif reducer in ['min', 'sum', 'max']:
elif reducer in ["min", "sum", "max"]:
rst = F.segment_reduce(reducer, value, offsets)
if reducer in ['min', 'max']:
if reducer in ["min", "max"]:
rst = F.replace_inf_with_zero(rst)
return rst
else:
......@@ -95,13 +97,14 @@ def segment_softmax(seglen, value):
[0.2500, 0.2500, 0.2500],
[0.2500, 0.2500, 0.2500]])
"""
value_max = segment_reduce(seglen, value, reducer='max')
value_max = segment_reduce(seglen, value, reducer="max")
value = F.exp(value - F.repeat(value_max, seglen, dim=0))
value_sum = segment_reduce(seglen, value, reducer='sum')
value_sum = segment_reduce(seglen, value, reducer="sum")
return value / F.repeat(value_sum, seglen, dim=0)
def segment_mm(a, b, seglen_a):
r""" Performs matrix multiplication according to segments.
r"""Performs matrix multiplication according to segments.
Suppose ``seglen_a == [10, 5, 0, 3]``, the operator will perform
four matrix multiplications::
......
"""dgl spmm operator module."""
import sys
from .. import backend as F
from ..backend import gspmm as gspmm_internal
from ..backend import gspmm_hetero as gspmm_internal_hetero
from .. import backend as F
__all__ = ['gspmm']
__all__ = ["gspmm"]
def reshape_lhs_rhs(lhs_data, rhs_data):
r""" Expand dims so that there will be no broadcasting issues with different
r"""Expand dims so that there will be no broadcasting issues with different
number of dimensions. For example, given two shapes (N, 3, 1), (E, 5, 3, 4)
that are valid broadcastable shapes, change them to (N, 1, 3, 1) and
(E, 5, 3, 4)
......@@ -32,8 +33,9 @@ def reshape_lhs_rhs(lhs_data, rhs_data):
rhs_data = F.reshape(rhs_data, new_rhs_shape)
return lhs_data, rhs_data
def gspmm(g, op, reduce_op, lhs_data, rhs_data):
r""" Generalized Sparse Matrix Multiplication interface.
r"""Generalized Sparse Matrix Multiplication interface.
It fuses two steps into one kernel.
1. Computes messages by :attr:`op` source node and edge features.
......@@ -69,27 +71,45 @@ def gspmm(g, op, reduce_op, lhs_data, rhs_data):
The result tensor.
"""
if g._graph.number_of_etypes() == 1:
if op not in ['copy_lhs', 'copy_rhs']:
if op not in ["copy_lhs", "copy_rhs"]:
lhs_data, rhs_data = reshape_lhs_rhs(lhs_data, rhs_data)
# With max and min reducers infinity will be returned for zero degree nodes
ret = gspmm_internal(g._graph, op,
'sum' if reduce_op == 'mean' else reduce_op,
lhs_data, rhs_data)
ret = gspmm_internal(
g._graph,
op,
"sum" if reduce_op == "mean" else reduce_op,
lhs_data,
rhs_data,
)
else:
# lhs_data or rhs_data is None only in unary functions like ``copy-u`` or ``copy_e``
lhs_data = [None] * g._graph.number_of_ntypes() if lhs_data is None else lhs_data
rhs_data = [None] * g._graph.number_of_etypes() if rhs_data is None else rhs_data
lhs_data = (
[None] * g._graph.number_of_ntypes()
if lhs_data is None
else lhs_data
)
rhs_data = (
[None] * g._graph.number_of_etypes()
if rhs_data is None
else rhs_data
)
# TODO (Israt): Call reshape func
lhs_and_rhs_tuple = tuple(list(lhs_data) + list(rhs_data))
ret = gspmm_internal_hetero(g._graph, op,
'sum' if reduce_op == 'mean' else reduce_op,
len(lhs_data), *lhs_and_rhs_tuple)
ret = gspmm_internal_hetero(
g._graph,
op,
"sum" if reduce_op == "mean" else reduce_op,
len(lhs_data),
*lhs_and_rhs_tuple
)
# TODO (Israt): Add support for 'mean' in heterograph
# divide in degrees for mean reducer.
if reduce_op == 'mean':
if reduce_op == "mean":
ret_shape = F.shape(ret)
deg = g.in_degrees()
deg = F.astype(F.clamp(deg, 1, max(g.number_of_edges(), 1)), F.dtype(ret))
deg = F.astype(
F.clamp(deg, 1, max(g.number_of_edges(), 1)), F.dtype(ret)
)
deg_shape = (ret_shape[0],) + (1,) * (len(ret_shape) - 1)
return ret / F.reshape(deg, deg_shape)
else:
......@@ -98,13 +118,17 @@ def gspmm(g, op, reduce_op, lhs_data, rhs_data):
def _attach_zerodeg_note(docstring, reducer):
note1 = """
The {} function will return zero for nodes with no incoming messages.""".format(reducer)
The {} function will return zero for nodes with no incoming messages.""".format(
reducer
)
note2 = """
This is implemented by replacing all {} values to zero.
""".format("infinity" if reducer == "min" else "negative infinity")
""".format(
"infinity" if reducer == "min" else "negative infinity"
)
docstring = docstring + note1
if reducer in ('min', 'max'):
if reducer in ("min", "max"):
docstring = docstring + note2
return docstring
......@@ -140,11 +164,14 @@ def _gen_spmm_func(binary_op, reduce_op):
Broadcasting follows NumPy semantics. Please see
https://docs.scipy.org/doc/numpy/user/basics.broadcasting.html
for more details about the NumPy broadcasting semantics.
""".format(binary_op, reduce_op)
""".format(
binary_op, reduce_op
)
docstring = _attach_zerodeg_note(docstring, reduce_op)
def func(g, x, y):
return gspmm(g, binary_op, reduce_op, x, y)
func.__name__ = name
func.__doc__ = docstring
return func
......@@ -155,13 +182,11 @@ def _gen_copy_reduce_func(binary_op, reduce_op):
name = "{}_{}".format(binary_op, reduce_op)
binary_str = {
"copy_u": "It copies node feature to edge as the message.",
'copy_e': "It regards edge feature as message."
}
x_str = {
"copy_u": "source node",
"copy_e": "edge"
"copy_e": "It regards edge feature as message.",
}
docstring = lambda binary_op: _attach_zerodeg_note("""Generalized SpMM function. {}
x_str = {"copy_u": "source node", "copy_e": "edge"}
docstring = lambda binary_op: _attach_zerodeg_note(
"""Generalized SpMM function. {}
Then aggregates the message by {} on destination nodes.
Parameters
......@@ -180,15 +205,16 @@ def _gen_copy_reduce_func(binary_op, reduce_op):
-----
This function supports autograd (computing input gradients given the output gradient).
""".format(
binary_str[binary_op],
binary_str[binary_op], reduce_op, x_str[binary_op]
),
reduce_op,
x_str[binary_op]), reduce_op)
)
def func(g, x):
if binary_op == 'copy_u':
return gspmm(g, 'copy_lhs', reduce_op, x, None)
if binary_op == "copy_u":
return gspmm(g, "copy_lhs", reduce_op, x, None)
else:
return gspmm(g, 'copy_rhs', reduce_op, None, x)
return gspmm(g, "copy_rhs", reduce_op, None, x)
func.__name__ = name
func.__doc__ = docstring(binary_op)
......
"""dgl optims."""
import importlib
import sys
import os
import sys
from ..backend import backend_name
from ..utils import expand_as_pair
def _load_backend(mod_name):
mod = importlib.import_module('.%s' % mod_name, __name__)
mod = importlib.import_module(".%s" % mod_name, __name__)
thismod = sys.modules[__name__]
for api, obj in mod.__dict__.items():
setattr(thismod, api, obj)
_load_backend(backend_name)
"""Node embedding optimizers"""
import abc
from abc import abstractmethod
import torch as th
from ...utils import get_shared_mem_array, create_shared_mem_array, \
pin_memory_inplace, gather_pinned_tensor_rows, \
scatter_pinned_tensor_rows
from ...nn.pytorch import NodeEmbedding
from ...cuda import nccl
from ...nn.pytorch import NodeEmbedding
from ...partition import NDArrayPartition
from ...utils import (
create_shared_mem_array,
gather_pinned_tensor_rows,
get_shared_mem_array,
pin_memory_inplace,
scatter_pinned_tensor_rows,
)
class SparseGradOptimizer(abc.ABC):
r''' The abstract sparse optimizer.
r"""The abstract sparse optimizer.
Note: dgl sparse optimizer only work with dgl.NodeEmbedding
......@@ -21,7 +27,8 @@ class SparseGradOptimizer(abc.ABC):
The list of NodeEmbeddings.
lr : float
The learning rate.
'''
"""
def __init__(self, params, lr):
self._params = params
self._lr = lr
......@@ -37,47 +44,54 @@ class SparseGradOptimizer(abc.ABC):
# otherwise it will crash the training
self.shmem_buffer_holder = []
assert len(params) > 0, 'Empty parameters'
assert len(params) > 0, "Empty parameters"
# if we are using shared memory for communication
for emb in params:
assert isinstance(emb, NodeEmbedding), \
'DGL SparseOptimizer only supports dgl.nn.NodeEmbedding'
assert isinstance(
emb, NodeEmbedding
), "DGL SparseOptimizer only supports dgl.nn.NodeEmbedding"
if self._rank is None:
self._rank = emb.rank
self._world_size = emb.world_size
else:
assert self._rank == emb.rank, \
'MultiGPU rank for each embedding should be same.'
assert self._world_size == emb.world_size, \
'MultiGPU world_size for each embedding should be same.'
assert (
self._rank == emb.rank
), "MultiGPU rank for each embedding should be same."
assert (
self._world_size == emb.world_size
), "MultiGPU world_size for each embedding should be same."
assert not self._rank is None
assert not self._world_size is None
self._nccl_root_id = 'SparseGradOptimizer.nccl_root_id'
self._nccl_root_id = "SparseGradOptimizer.nccl_root_id"
def step(self):
''' The step function.
"""The step function.
The step function is invoked at the end of every batch to update embeddings
'''
"""
# on the first step, check to see if the grads are on the GPU
if self._first_step:
for emb in self._params:
for _, data in emb._trace:
if data.grad.data.device.type == 'cuda':
if data.grad.data.device.type == "cuda":
# create a communicator
if self._device:
assert self._device == data.grad.device, \
"All gradients must be on the same device"
assert (
self._device == data.grad.device
), "All gradients must be on the same device"
else:
self._device = data.grad.device
else:
assert not self._device, \
"All gradients must be on the same device"
assert (
not self._device
), "All gradients must be on the same device"
# distributed backend use nccl
if self._device and \
(not th.distributed.is_initialized() or th.distributed.get_backend() == 'nccl'):
if self._device and (
not th.distributed.is_initialized()
or th.distributed.get_backend() == "nccl"
):
# device is only set if the grads are on a GPU
self._comm_setup()
else:
......@@ -91,16 +105,15 @@ class SparseGradOptimizer(abc.ABC):
self._shared_step()
def setup(self, params):
''' This is function where subclasses can perform any setup they need
to. It will be called during the first step, and communicators or
shared memory will have been setup before this call.
Parameters
----------
params : list of NodeEmbedding
The list of NodeEmbeddings.
'''
"""This is function where subclasses can perform any setup they need
to. It will be called during the first step, and communicators or
shared memory will have been setup before this call.
Parameters
----------
params : list of NodeEmbedding
The list of NodeEmbeddings.
"""
def _comm_setup(self):
# find a store to communicate the unique id through
......@@ -120,26 +133,32 @@ class SparseGradOptimizer(abc.ABC):
uid = store.get(self._nccl_root_id)
nccl_id = nccl.UniqueId(uid)
# needs to be set for nccl to work
self._comm = nccl.Communicator(self._world_size,
self._rank,
nccl_id)
self._comm = nccl.Communicator(
self._world_size, self._rank, nccl_id
)
th.distributed.barrier()
def _shared_setup(self):
for emb in self._params:
emb_name = emb.name
if self._rank == 0: # the master gpu process
opt_meta = create_shared_mem_array(emb_name+'_opt_meta', \
(self._world_size, self._world_size), th.int32).zero_()
if self._rank == 0: # the master gpu process
opt_meta = create_shared_mem_array(
emb_name + "_opt_meta",
(self._world_size, self._world_size),
th.int32,
).zero_()
if self._rank == 0:
emb.store.set(emb_name+'_opt_meta', emb_name)
emb.store.set(emb_name + "_opt_meta", emb_name)
self._opt_meta[emb_name] = opt_meta
elif self._rank > 0:
# receive
emb.store.wait([emb_name+'_opt_meta'])
opt_meta = get_shared_mem_array(emb_name+'_opt_meta', \
(self._world_size, self._world_size), th.int32)
emb.store.wait([emb_name + "_opt_meta"])
opt_meta = get_shared_mem_array(
emb_name + "_opt_meta",
(self._world_size, self._world_size),
th.int32,
)
self._opt_meta[emb_name] = opt_meta
def _comm_step(self):
......@@ -147,7 +166,7 @@ class SparseGradOptimizer(abc.ABC):
with th.no_grad():
idx_in = {}
grad_in = {}
for emb in self._params: # pylint: disable=too-many-nested-blocks
for emb in self._params: # pylint: disable=too-many-nested-blocks
emb_name = emb.name
partition = emb.partition
......@@ -156,14 +175,17 @@ class SparseGradOptimizer(abc.ABC):
partition = NDArrayPartition(
emb.num_embeddings,
self._world_size if self._world_size > 0 else 1,
mode='remainder')
mode="remainder",
)
# we need to combine gradients from multiple forward paths
if len(emb._trace) == 0:
idx = th.zeros((0,), dtype=th.long, device=self._device)
grad = th.zeros((0, emb.embedding_dim),
dtype=th.float32,
device=self._device)
grad = th.zeros(
(0, emb.embedding_dim),
dtype=th.float32,
device=self._device,
)
elif len(emb._trace) == 1:
# the special case where we can use the tensors as is
# without any memcpy's
......@@ -178,9 +200,10 @@ class SparseGradOptimizer(abc.ABC):
idx = th.cat(idx, dim=0)
grad = th.cat(grad, dim=0)
idx_in[emb_name], grad_in[emb_name] = \
comm.sparse_all_to_all_push(
idx, grad, partition=partition)
(
idx_in[emb_name],
grad_in[emb_name],
) = comm.sparse_all_to_all_push(idx, grad, partition=partition)
if emb.partition:
# if the embedding is partitioned, map back to indexes
# into the local tensor
......@@ -198,7 +221,6 @@ class SparseGradOptimizer(abc.ABC):
grad = grad_in[emb_name]
self.update(idx, grad, emb)
def _shared_step(self):
with th.no_grad():
# Frequently alloc and free shared memory to hold intermediate tensor is expensive
......@@ -206,7 +228,7 @@ class SparseGradOptimizer(abc.ABC):
shared_emb = {emb.name: ([], []) for emb in self._params}
# Go through all sparse embeddings
for emb in self._params: # pylint: disable=too-many-nested-blocks
for emb in self._params: # pylint: disable=too-many-nested-blocks
emb_name = emb.name
# we need to combine gradients from multiple forward paths
......@@ -222,10 +244,20 @@ class SparseGradOptimizer(abc.ABC):
# Note: we cannot skip the gradient exchange and update steps as other
# working processes may send gradient update requests corresponding
# to certain embedding to this process.
idx = th.cat(idx, dim=0) if len(idx) != 0 else \
th.zeros((0,), dtype=th.long, device=th.device('cpu'))
grad = th.cat(grad, dim=0) if len(grad) != 0 else \
th.zeros((0, emb.embedding_dim), dtype=th.float32, device=th.device('cpu'))
idx = (
th.cat(idx, dim=0)
if len(idx) != 0
else th.zeros((0,), dtype=th.long, device=th.device("cpu"))
)
grad = (
th.cat(grad, dim=0)
if len(grad) != 0
else th.zeros(
(0, emb.embedding_dim),
dtype=th.float32,
device=th.device("cpu"),
)
)
device = grad.device
idx_dtype = idx.dtype
......@@ -263,22 +295,40 @@ class SparseGradOptimizer(abc.ABC):
# currently nccl does not support Alltoallv operation
# we need to use CPU shared memory to share gradient
# across processes
idx_i = idx_i.to(th.device('cpu'))
grad_i = grad_i.to(th.device('cpu'))
idx_shmem_name = 'idx_{}_{}_{}'.format(emb_name, self._rank, i)
grad_shmem_name = 'grad_{}_{}_{}'.format(emb_name, self._rank, i)
idx_i = idx_i.to(th.device("cpu"))
grad_i = grad_i.to(th.device("cpu"))
idx_shmem_name = "idx_{}_{}_{}".format(
emb_name, self._rank, i
)
grad_shmem_name = "grad_{}_{}_{}".format(
emb_name, self._rank, i
)
# Create shared memory to hold temporary index and gradient tensor for
# cross-process send and recv.
if idx_shmem_name not in self._shared_cache[emb_name] or \
self._shared_cache[emb_name][idx_shmem_name].shape[0] \
< idx_i.shape[0]:
if idx_shmem_name in self._shared_cache[emb_name]:
if (
idx_shmem_name
not in self._shared_cache[emb_name]
or self._shared_cache[emb_name][
idx_shmem_name
].shape[0]
< idx_i.shape[0]
):
if (
idx_shmem_name
in self._shared_cache[emb_name]
):
self.shmem_buffer_holder.append(
self._shared_cache[emb_name][idx_shmem_name])
self._shared_cache[emb_name][
idx_shmem_name
]
)
self.shmem_buffer_holder.append(
self._shared_cache[emb_name][grad_shmem_name])
self._shared_cache[emb_name][
grad_shmem_name
]
)
# The total number of buffers is the number of NodeEmbeddings *
# world_size * (world_size - 1). The minimun buffer size is 128.
......@@ -287,22 +337,40 @@ class SparseGradOptimizer(abc.ABC):
# frequent shared memory allocation.
# The overall buffer cost will be smaller than three times
# the maximum memory requirement for sharing gradients.
buffer_size = 128 if idx_i.shape[0] < 128 else idx_i.shape[0] * 2
buffer_size = (
128
if idx_i.shape[0] < 128
else idx_i.shape[0] * 2
)
idx_shmem = create_shared_mem_array(
'{}_{}'.format(idx_shmem_name, buffer_size), \
(buffer_size,), idx_dtype)
"{}_{}".format(idx_shmem_name, buffer_size),
(buffer_size,),
idx_dtype,
)
grad_shmem = create_shared_mem_array(
'{}_{}'.format(grad_shmem_name, buffer_size), \
(buffer_size, grad_dim), grad_dtype)
self._shared_cache[emb_name][idx_shmem_name] = idx_shmem
self._shared_cache[emb_name][grad_shmem_name] = grad_shmem
"{}_{}".format(
grad_shmem_name, buffer_size
),
(buffer_size, grad_dim),
grad_dtype,
)
self._shared_cache[emb_name][
idx_shmem_name
] = idx_shmem
self._shared_cache[emb_name][
grad_shmem_name
] = grad_shmem
# Fill shared memory with temporal index tensor and gradient tensor
self._shared_cache[emb_name][idx_shmem_name][:idx_i.shape[0]] \
= idx_i
self._shared_cache[emb_name][grad_shmem_name][:idx_i.shape[0]] \
= grad_i
self._opt_meta[emb_name][self._rank][i] = idx_i.shape[0]
self._shared_cache[emb_name][idx_shmem_name][
: idx_i.shape[0]
] = idx_i
self._shared_cache[emb_name][grad_shmem_name][
: idx_i.shape[0]
] = grad_i
self._opt_meta[emb_name][self._rank][
i
] = idx_i.shape[0]
else:
shared_emb[emb_name][0].append(idx)
shared_emb[emb_name][1].append(grad)
......@@ -310,7 +378,7 @@ class SparseGradOptimizer(abc.ABC):
# make sure the idx shape is passed to each process through opt_meta
if self._world_size > 1:
th.distributed.barrier()
for emb in self._params: # pylint: disable=too-many-nested-blocks
for emb in self._params: # pylint: disable=too-many-nested-blocks
emb_name = emb.name
if self._world_size > 1:
# The first element in shared_emb[emb_name][0] is the local idx
......@@ -318,30 +386,56 @@ class SparseGradOptimizer(abc.ABC):
# gather gradients from all other processes
for i in range(self._world_size):
if i != self._rank:
idx_shmem_name = 'idx_{}_{}_{}'.format(emb_name, i, self._rank)
grad_shmem_name = 'grad_{}_{}_{}'.format(emb_name, i, self._rank)
idx_shmem_name = "idx_{}_{}_{}".format(
emb_name, i, self._rank
)
grad_shmem_name = "grad_{}_{}_{}".format(
emb_name, i, self._rank
)
size = self._opt_meta[emb_name][i][self._rank]
# Retrive shared memory holding the temporal index and gradient
# tensor that is sent to current training process
if idx_shmem_name not in self._shared_cache[emb_name] or \
self._shared_cache[emb_name][idx_shmem_name].shape[0] < size:
if (
idx_shmem_name
not in self._shared_cache[emb_name]
or self._shared_cache[emb_name][
idx_shmem_name
].shape[0]
< size
):
buffer_size = 128 if size < 128 else size * 2
idx_shmem = get_shared_mem_array(
'{}_{}'.format(idx_shmem_name, buffer_size), \
(buffer_size,), idx_dtype)
"{}_{}".format(idx_shmem_name, buffer_size),
(buffer_size,),
idx_dtype,
)
grad_shmem = get_shared_mem_array(
'{}_{}'.format(grad_shmem_name, buffer_size), \
(buffer_size, grad_dim), grad_dtype)
self._shared_cache[emb_name][idx_shmem_name] = idx_shmem
self._shared_cache[emb_name][grad_shmem_name] = grad_shmem
idx_i = self._shared_cache[emb_name][idx_shmem_name][:size]
grad_i = self._shared_cache[emb_name][grad_shmem_name][:size]
shared_emb[emb_name][0].append(idx_i.to(device,
non_blocking=True))
shared_emb[emb_name][1].append(grad_i.to(device,
non_blocking=True))
"{}_{}".format(
grad_shmem_name, buffer_size
),
(buffer_size, grad_dim),
grad_dtype,
)
self._shared_cache[emb_name][
idx_shmem_name
] = idx_shmem
self._shared_cache[emb_name][
grad_shmem_name
] = grad_shmem
idx_i = self._shared_cache[emb_name][
idx_shmem_name
][:size]
grad_i = self._shared_cache[emb_name][
grad_shmem_name
][:size]
shared_emb[emb_name][0].append(
idx_i.to(device, non_blocking=True)
)
shared_emb[emb_name][1].append(
grad_i.to(device, non_blocking=True)
)
if self._clean_grad:
# clean gradient track
......@@ -362,7 +456,7 @@ class SparseGradOptimizer(abc.ABC):
@abstractmethod
def update(self, idx, grad, emb):
""" Update embeddings in a sparse manner
"""Update embeddings in a sparse manner
Sparse embeddings are updated in mini batches. We maintain gradient states for
each embedding so they can be updated separately.
......@@ -377,12 +471,12 @@ class SparseGradOptimizer(abc.ABC):
"""
def zero_grad(self):
"""clean grad cache
"""
"""clean grad cache"""
self._clean_grad = True
class SparseAdagrad(SparseGradOptimizer):
r''' Node embedding optimizer using the Adagrad algorithm.
r"""Node embedding optimizer using the Adagrad algorithm.
This optimizer implements a sparse version of Adagrad algorithm for
optimizing :class:`dgl.nn.NodeEmbedding`. Being sparse means it only updates
......@@ -418,7 +512,8 @@ class SparseAdagrad(SparseGradOptimizer):
... loss = F.sum(feats + 1, 0)
... loss.backward()
... optimizer.step()
'''
"""
def __init__(self, params, lr, eps=1e-10):
super(SparseAdagrad, self).__init__(params, lr)
self._eps = eps
......@@ -426,38 +521,43 @@ class SparseAdagrad(SparseGradOptimizer):
def setup(self, params):
# We need to register a state sum for each embedding in the kvstore.
for emb in params:
assert isinstance(emb, NodeEmbedding), \
'SparseAdagrad only supports dgl.nn.NodeEmbedding'
assert isinstance(
emb, NodeEmbedding
), "SparseAdagrad only supports dgl.nn.NodeEmbedding"
emb_name = emb.name
if th.device(emb.emb_tensor.device) == th.device('cpu'):
if th.device(emb.emb_tensor.device) == th.device("cpu"):
# if our embedding is on the CPU, our state also has to be
if self._rank < 0:
state = th.empty(
emb.weight.shape,
dtype=th.float32,
device=th.device('cpu')).zero_()
device=th.device("cpu"),
).zero_()
elif self._rank == 0:
state = create_shared_mem_array(emb_name+'_state', \
emb.weight.shape, th.float32).zero_()
state = create_shared_mem_array(
emb_name + "_state", emb.weight.shape, th.float32
).zero_()
if self._world_size > 1:
emb.store.set(emb_name+'_opt', emb_name)
emb.store.set(emb_name + "_opt", emb_name)
elif self._rank > 0:
# receive
emb.store.wait([emb_name+'_opt'])
state = get_shared_mem_array(emb_name+'_state', \
emb.weight.shape, th.float32)
emb.store.wait([emb_name + "_opt"])
state = get_shared_mem_array(
emb_name + "_state", emb.weight.shape, th.float32
)
else:
# distributed state on on gpu
state = th.empty(
emb.emb_tensor.shape,
dtype=th.float32,
device=emb.emb_tensor.device).zero_()
device=emb.emb_tensor.device,
).zero_()
emb.set_optm_state(state)
def update(self, idx, grad, emb):
""" Update embeddings in a sparse manner
"""Update embeddings in a sparse manner
Sparse embeddings are updated in mini batches. We maintain gradient states for
each embedding so they can be updated separately.
......@@ -474,12 +574,16 @@ class SparseAdagrad(SparseGradOptimizer):
clr = self._lr
# the update is non-linear so indices must be unique
grad_indices, inverse, cnt = th.unique(idx, return_inverse=True, return_counts=True)
grad_values = th.zeros((grad_indices.shape[0], grad.shape[1]), device=grad.device)
grad_indices, inverse, cnt = th.unique(
idx, return_inverse=True, return_counts=True
)
grad_values = th.zeros(
(grad_indices.shape[0], grad.shape[1]), device=grad.device
)
grad_values.index_add_(0, inverse, grad)
grad_values = grad_values / cnt.unsqueeze(1)
grad_sum = (grad_values * grad_values)
grad_sum = grad_values * grad_values
state = emb.optm_state
state_dev = state.device
state_idx = grad_indices.to(state_dev)
......@@ -491,8 +595,9 @@ class SparseAdagrad(SparseGradOptimizer):
tmp = clr * grad_values / std_values
emb.weight[state_idx] -= tmp.to(state_dev)
class SparseAdam(SparseGradOptimizer):
r''' Node embedding optimizer using the Adam algorithm.
r"""Node embedding optimizer using the Adam algorithm.
This optimizer implements a sparse version of Adagrad algorithm for
optimizing :class:`dgl.nn.NodeEmbedding`. Being sparse means it only
......@@ -545,9 +650,17 @@ class SparseAdam(SparseGradOptimizer):
... loss = F.sum(feats + 1, 0)
... loss.backward()
... optimizer.step()
'''
def __init__(self, params, lr, betas=(0.9, 0.999), eps=1e-08, \
use_uva=None, dtype=th.float32):
"""
def __init__(
self,
params,
lr,
betas=(0.9, 0.999),
eps=1e-08,
use_uva=None,
dtype=th.float32,
):
super(SparseAdam, self).__init__(params, lr)
self._lr = lr
self._beta1 = betas[0]
......@@ -556,9 +669,10 @@ class SparseAdam(SparseGradOptimizer):
self._use_uva = use_uva
self._nd_handle = {}
self._is_using_uva = {}
assert dtype in [th.float16, th.float32], \
"Unsupported dtype {}. Valid choices are th.float32 " \
assert dtype in [th.float16, th.float32], (
"Unsupported dtype {}. Valid choices are th.float32 "
"and th.float32".format(dtype)
)
self._dtype = dtype
def _setup_uva(self, name, mem, power):
......@@ -570,44 +684,54 @@ class SparseAdam(SparseGradOptimizer):
def setup(self, params):
# We need to register a state sum for each embedding in the kvstore.
for emb in params:
assert isinstance(emb, NodeEmbedding), \
'SparseAdam only supports dgl.nn.NodeEmbedding'
assert isinstance(
emb, NodeEmbedding
), "SparseAdam only supports dgl.nn.NodeEmbedding"
emb_name = emb.name
self._is_using_uva[emb_name] = self._use_uva
if th.device(emb.emb_tensor.device) == th.device('cpu'):
if th.device(emb.emb_tensor.device) == th.device("cpu"):
# if our embedding is on the CPU, our state also has to be
if self._rank < 0:
state_step = th.empty(
(emb.weight.shape[0],),
dtype=th.int32,
device=th.device('cpu')).zero_()
device=th.device("cpu"),
).zero_()
state_mem = th.empty(
emb.weight.shape,
dtype=self._dtype,
device=th.device('cpu')).zero_()
device=th.device("cpu"),
).zero_()
state_power = th.empty(
emb.weight.shape,
dtype=self._dtype,
device=th.device('cpu')).zero_()
device=th.device("cpu"),
).zero_()
elif self._rank == 0:
state_step = create_shared_mem_array(emb_name+'_step', \
(emb.weight.shape[0],), th.int32).zero_()
state_mem = create_shared_mem_array(emb_name+'_mem', \
emb.weight.shape, self._dtype).zero_()
state_power = create_shared_mem_array(emb_name+'_power', \
emb.weight.shape, self._dtype).zero_()
state_step = create_shared_mem_array(
emb_name + "_step", (emb.weight.shape[0],), th.int32
).zero_()
state_mem = create_shared_mem_array(
emb_name + "_mem", emb.weight.shape, self._dtype
).zero_()
state_power = create_shared_mem_array(
emb_name + "_power", emb.weight.shape, self._dtype
).zero_()
if self._world_size > 1:
emb.store.set(emb_name+'_opt', emb_name)
emb.store.set(emb_name + "_opt", emb_name)
elif self._rank > 0:
# receive
emb.store.wait([emb_name+'_opt'])
state_step = get_shared_mem_array(emb_name+'_step', \
(emb.weight.shape[0],), th.int32)
state_mem = get_shared_mem_array(emb_name+'_mem', \
emb.weight.shape, self._dtype)
state_power = get_shared_mem_array(emb_name+'_power', \
emb.weight.shape, self._dtype)
emb.store.wait([emb_name + "_opt"])
state_step = get_shared_mem_array(
emb_name + "_step", (emb.weight.shape[0],), th.int32
)
state_mem = get_shared_mem_array(
emb_name + "_mem", emb.weight.shape, self._dtype
)
state_power = get_shared_mem_array(
emb_name + "_power", emb.weight.shape, self._dtype
)
if self._is_using_uva[emb_name]:
# if use_uva has been explicitly set to true, otherwise
......@@ -621,20 +745,23 @@ class SparseAdam(SparseGradOptimizer):
state_step = th.empty(
[emb.emb_tensor.shape[0]],
dtype=th.int32,
device=emb.emb_tensor.device).zero_()
device=emb.emb_tensor.device,
).zero_()
state_mem = th.empty(
emb.emb_tensor.shape,
dtype=self._dtype,
device=emb.emb_tensor.device).zero_()
device=emb.emb_tensor.device,
).zero_()
state_power = th.empty(
emb.emb_tensor.shape,
dtype=self._dtype,
device=emb.emb_tensor.device).zero_()
device=emb.emb_tensor.device,
).zero_()
state = (state_step, state_mem, state_power)
emb.set_optm_state(state)
def update(self, idx, grad, emb):
""" Update embeddings in a sparse manner
"""Update embeddings in a sparse manner
Sparse embeddings are updated in mini batches. We maintain gradient states for
each embedding so they can be updated separately.
......@@ -655,7 +782,7 @@ class SparseAdam(SparseGradOptimizer):
# whether or not we need to transfer data from the GPU to the CPU
# while updating the weights
is_d2h = state_dev.type == 'cpu' and exec_dev.type == 'cuda'
is_d2h = state_dev.type == "cpu" and exec_dev.type == "cuda"
# only perform async copies cpu -> gpu, or gpu-> gpu, but block
# when copying to the cpu, so as to ensure the copy is finished
......@@ -678,16 +805,18 @@ class SparseAdam(SparseGradOptimizer):
clr = self._lr
# There can be duplicated indices due to sampling.
# Thus unique them here and average the gradient here.
grad_indices, inverse, cnt = th.unique(idx,
return_inverse=True,
return_counts=True)
grad_indices, inverse, cnt = th.unique(
idx, return_inverse=True, return_counts=True
)
state_idx = grad_indices.to(state_dev)
state_step[state_idx] += 1
state_step = state_step[state_idx].to(exec_dev)
if use_uva:
orig_mem = gather_pinned_tensor_rows(state_mem, grad_indices)
orig_power = gather_pinned_tensor_rows(state_power, grad_indices)
orig_power = gather_pinned_tensor_rows(
state_power, grad_indices
)
else:
orig_mem = state_mem[state_idx].to(exec_dev)
orig_power = state_power[state_idx].to(exec_dev)
......@@ -695,38 +824,48 @@ class SparseAdam(SparseGradOptimizer):
orig_mem = orig_mem.to(dtype=exec_dtype)
orig_power = orig_power.to(dtype=exec_dtype)
grad_values = th.zeros((grad_indices.shape[0], grad.shape[1]), device=exec_dev)
grad_values = th.zeros(
(grad_indices.shape[0], grad.shape[1]), device=exec_dev
)
grad_values.index_add_(0, inverse, grad)
grad_values = grad_values / cnt.unsqueeze(1)
grad_mem = grad_values
grad_power = grad_values * grad_values
update_mem = beta1 * orig_mem + (1.-beta1) * grad_mem
update_power = beta2 * orig_power + (1.-beta2) * grad_power
update_mem = beta1 * orig_mem + (1.0 - beta1) * grad_mem
update_power = beta2 * orig_power + (1.0 - beta2) * grad_power
if use_uva:
scatter_pinned_tensor_rows(state_mem, \
grad_indices, \
update_mem.to(dtype=self._dtype))
scatter_pinned_tensor_rows(state_power, \
grad_indices, \
update_power.to(dtype=self._dtype))
scatter_pinned_tensor_rows(
state_mem, grad_indices, update_mem.to(dtype=self._dtype)
)
scatter_pinned_tensor_rows(
state_power,
grad_indices,
update_power.to(dtype=self._dtype),
)
else:
update_mem_dst = update_mem.to(dtype=self._dtype).to(
state_dev, non_blocking=True)
state_dev, non_blocking=True
)
update_power_dst = update_power.to(dtype=self._dtype).to(
state_dev, non_blocking=True)
state_dev, non_blocking=True
)
if state_block:
# use events to try and overlap CPU and GPU as much as possible
update_event = th.cuda.Event()
update_event.record()
update_mem_corr = update_mem / (1. - th.pow(th.tensor(beta1, device=exec_dev),
state_step)).unsqueeze(1)
update_power_corr = update_power / (1. - th.pow(th.tensor(beta2, device=exec_dev),
state_step)).unsqueeze(1)
std_values = clr * update_mem_corr / (th.sqrt(update_power_corr) + eps)
update_mem_corr = update_mem / (
1.0 - th.pow(th.tensor(beta1, device=exec_dev), state_step)
).unsqueeze(1)
update_power_corr = update_power / (
1.0 - th.pow(th.tensor(beta2, device=exec_dev), state_step)
).unsqueeze(1)
std_values = (
clr * update_mem_corr / (th.sqrt(update_power_corr) + eps)
)
std_values_dst = std_values.to(state_dev, non_blocking=True)
if state_block:
......
......@@ -2,22 +2,26 @@
import os
import re
import time
import numpy as np
from ._ffi.function import _init_api
from .heterograph import DGLHeteroGraph
from . import backend as F
from . import utils
from ._ffi.function import _init_api
from .base import EID, ETYPE, NID, NTYPE
from .heterograph import DGLHeteroGraph
from .ndarray import NDArray
from .base import EID, NID, NTYPE, ETYPE
from .subgraph import edge_subgraph
__all__ = ["metis_partition", "metis_partition_assignment",
"partition_graph_with_halo"]
__all__ = [
"metis_partition",
"metis_partition_assignment",
"partition_graph_with_halo",
]
def reorder_nodes(g, new_node_ids):
""" Generate a new graph with new node IDs.
"""Generate a new graph with new node IDs.
We assign each node in the input graph with a new node ID. This results in
a new graph.
......@@ -33,25 +37,29 @@ def reorder_nodes(g, new_node_ids):
DGLGraph
The graph with new node IDs.
"""
assert len(new_node_ids) == g.number_of_nodes(), \
"The number of new node ids must match #nodes in the graph."
assert (
len(new_node_ids) == g.number_of_nodes()
), "The number of new node ids must match #nodes in the graph."
new_node_ids = utils.toindex(new_node_ids)
sorted_ids, idx = F.sort_1d(new_node_ids.tousertensor())
assert F.asnumpy(sorted_ids[0]) == 0 \
and F.asnumpy(sorted_ids[-1]) == g.number_of_nodes() - 1, \
"The new node IDs are incorrect."
assert (
F.asnumpy(sorted_ids[0]) == 0
and F.asnumpy(sorted_ids[-1]) == g.number_of_nodes() - 1
), "The new node IDs are incorrect."
new_gidx = _CAPI_DGLReorderGraph_Hetero(
g._graph, new_node_ids.todgltensor())
new_g = DGLHeteroGraph(gidx=new_gidx, ntypes=['_N'], etypes=['_E'])
new_g.ndata['orig_id'] = idx
g._graph, new_node_ids.todgltensor()
)
new_g = DGLHeteroGraph(gidx=new_gidx, ntypes=["_N"], etypes=["_E"])
new_g.ndata["orig_id"] = idx
return new_g
def _get_halo_heterosubgraph_inner_node(halo_subg):
return _CAPI_GetHaloSubgraphInnerNodes_Hetero(halo_subg)
def reshuffle_graph(g, node_part=None):
'''Reshuffle node ids and edge IDs of a graph.
"""Reshuffle node ids and edge IDs of a graph.
This function reshuffles nodes and edges in a graph so that all nodes/edges of the same type
have contiguous IDs. If a graph is partitioned and nodes are assigned to different partitions,
......@@ -71,11 +79,11 @@ def reshuffle_graph(g, node_part=None):
(DGLGraph, Tensor)
The graph whose nodes and edges are reshuffled.
The 1D tensor that indicates the partition IDs of the nodes in the reshuffled graph.
'''
"""
# In this case, we don't need to reshuffle node IDs and edge IDs.
if node_part is None:
g.ndata['orig_id'] = F.arange(0, g.number_of_nodes())
g.edata['orig_id'] = F.arange(0, g.number_of_edges())
g.ndata["orig_id"] = F.arange(0, g.number_of_nodes())
g.edata["orig_id"] = F.arange(0, g.number_of_edges())
return g, None
start = time.time()
......@@ -89,38 +97,48 @@ def reshuffle_graph(g, node_part=None):
if is_hetero:
num_node_types = F.max(g.ndata[NTYPE], 0) + 1
if node_part is not None:
sorted_part, new2old_map = F.sort_1d(node_part * num_node_types + g.ndata[NTYPE])
sorted_part, new2old_map = F.sort_1d(
node_part * num_node_types + g.ndata[NTYPE]
)
else:
sorted_part, new2old_map = F.sort_1d(g.ndata[NTYPE])
sorted_part = F.floor_div(sorted_part, num_node_types)
elif node_part is not None:
sorted_part, new2old_map = F.sort_1d(node_part)
else:
g.ndata['orig_id'] = g.ndata[NID]
g.edata['orig_id'] = g.edata[EID]
g.ndata["orig_id"] = g.ndata[NID]
g.edata["orig_id"] = g.edata[EID]
return g, None
new_node_ids = np.zeros((g.number_of_nodes(),), dtype=np.int64)
new_node_ids[F.asnumpy(new2old_map)] = np.arange(0, g.number_of_nodes())
# If the input graph is homogneous, we only need to create an empty array, so that
# _CAPI_DGLReassignEdges_Hetero knows how to handle it.
etype = g.edata[ETYPE] if ETYPE in g.edata else F.zeros((0), F.dtype(sorted_part), F.cpu())
etype = (
g.edata[ETYPE]
if ETYPE in g.edata
else F.zeros((0), F.dtype(sorted_part), F.cpu())
)
g = reorder_nodes(g, new_node_ids)
node_part = utils.toindex(sorted_part)
# We reassign edges in in-CSR. In this way, after partitioning, we can ensure
# that all edges in a partition are in the contiguous ID space.
etype_idx = utils.toindex(etype)
orig_eids = _CAPI_DGLReassignEdges_Hetero(g._graph, etype_idx.todgltensor(),
node_part.todgltensor(), True)
orig_eids = _CAPI_DGLReassignEdges_Hetero(
g._graph, etype_idx.todgltensor(), node_part.todgltensor(), True
)
orig_eids = utils.toindex(orig_eids)
orig_eids = orig_eids.tousertensor()
g.edata['orig_id'] = orig_eids
g.edata["orig_id"] = orig_eids
print('Reshuffle nodes and edges: {:.3f} seconds'.format(time.time() - start))
print(
"Reshuffle nodes and edges: {:.3f} seconds".format(time.time() - start)
)
return g, node_part.tousertensor()
def partition_graph_with_halo(g, node_part, extra_cached_hops, reshuffle=False):
'''Partition a graph.
"""Partition a graph.
Based on the given node assignments for each partition, the function splits
the input graph into subgraphs. A subgraph may contain HALO nodes which does
......@@ -156,20 +174,21 @@ def partition_graph_with_halo(g, node_part, extra_cached_hops, reshuffle=False):
Tensor
1D tensor that stores the mapping between the reshuffled edge IDs and
the original edge IDs if 'reshuffle=True'. Otherwise, return None.
'''
"""
assert len(node_part) == g.number_of_nodes()
if reshuffle:
g, node_part = reshuffle_graph(g, node_part)
orig_nids = g.ndata['orig_id']
orig_eids = g.edata['orig_id']
orig_nids = g.ndata["orig_id"]
orig_eids = g.edata["orig_id"]
node_part = utils.toindex(node_part)
start = time.time()
subgs = _CAPI_DGLPartitionWithHalo_Hetero(
g._graph, node_part.todgltensor(), extra_cached_hops)
g._graph, node_part.todgltensor(), extra_cached_hops
)
# g is no longer needed. Free memory.
g = None
print('Split the graph: {:.3f} seconds'.format(time.time() - start))
print("Split the graph: {:.3f} seconds".format(time.time() - start))
subg_dict = {}
node_part = node_part.tousertensor()
start = time.time()
......@@ -182,14 +201,17 @@ def partition_graph_with_halo(g, node_part, extra_cached_hops, reshuffle=False):
inner_nids = F.nonzero_1d(inner_node)
# TODO(zhengda) we need to fix utils.toindex() to avoid the dtype cast below.
inner_nids = F.astype(inner_nids, F.int64)
inner_eids = subg.in_edges(inner_nids, form='eid')
inner_edge = F.scatter_row(inner_edge, inner_eids,
F.ones((len(inner_eids),), F.dtype(inner_edge), F.cpu()))
inner_eids = subg.in_edges(inner_nids, form="eid")
inner_edge = F.scatter_row(
inner_edge,
inner_eids,
F.ones((len(inner_eids),), F.dtype(inner_edge), F.cpu()),
)
return inner_edge
# This creaets a subgraph from subgraphs returned from the CAPI above.
def create_subgraph(subg, induced_nodes, induced_edges, inner_node):
subg1 = DGLHeteroGraph(gidx=subg.graph, ntypes=['_N'], etypes=['_E'])
subg1 = DGLHeteroGraph(gidx=subg.graph, ntypes=["_N"], etypes=["_E"])
# If IDs are shuffled, we should shuffled edges. This will help us collect edge data
# from the distributed graph after training.
if reshuffle:
......@@ -199,7 +221,9 @@ def partition_graph_with_halo(g, node_part, extra_cached_hops, reshuffle=False):
# of outer edges with a large value, so we will get the sorted list as we want.
max_eid = F.max(induced_edges[0], 0) + 1
inner_edge = get_inner_edge(subg1, inner_node)
eid = F.astype(induced_edges[0], F.int64) + max_eid * F.astype(inner_edge == 0, F.int64)
eid = F.astype(induced_edges[0], F.int64) + max_eid * F.astype(
inner_edge == 0, F.int64
)
_, index = F.sort_1d(eid)
subg1 = edge_subgraph(subg1, index, relabel_nodes=False)
......@@ -213,44 +237,49 @@ def partition_graph_with_halo(g, node_part, extra_cached_hops, reshuffle=False):
for i, subg in enumerate(subgs):
inner_node = _get_halo_heterosubgraph_inner_node(subg)
inner_node = F.zerocopy_from_dlpack(inner_node.to_dlpack())
subg = create_subgraph(subg, subg.induced_nodes, subg.induced_edges, inner_node)
subg.ndata['inner_node'] = inner_node
subg.ndata['part_id'] = F.gather_row(node_part, subg.ndata[NID])
subg = create_subgraph(
subg, subg.induced_nodes, subg.induced_edges, inner_node
)
subg.ndata["inner_node"] = inner_node
subg.ndata["part_id"] = F.gather_row(node_part, subg.ndata[NID])
if reshuffle:
subg.ndata['orig_id'] = F.gather_row(orig_nids, subg.ndata[NID])
subg.edata['orig_id'] = F.gather_row(orig_eids, subg.edata[EID])
subg.ndata["orig_id"] = F.gather_row(orig_nids, subg.ndata[NID])
subg.edata["orig_id"] = F.gather_row(orig_eids, subg.edata[EID])
if extra_cached_hops >= 1:
inner_edge = get_inner_edge(subg, inner_node)
else:
inner_edge = F.ones((subg.number_of_edges(),), F.int8, F.cpu())
subg.edata['inner_edge'] = inner_edge
subg.edata["inner_edge"] = inner_edge
subg_dict[i] = subg
print('Construct subgraphs: {:.3f} seconds'.format(time.time() - start))
print("Construct subgraphs: {:.3f} seconds".format(time.time() - start))
if reshuffle:
return subg_dict, orig_nids, orig_eids
else:
return subg_dict, None, None
def get_peak_mem():
''' Get the peak memory size.
"""Get the peak memory size.
Returns
-------
float
The peak memory size in GB.
'''
if not os.path.exists('/proc/self/status'):
"""
if not os.path.exists("/proc/self/status"):
return 0.0
for line in open('/proc/self/status', 'r'):
if 'VmPeak' in line:
mem = re.findall(r'\d+', line)[0]
for line in open("/proc/self/status", "r"):
if "VmPeak" in line:
mem = re.findall(r"\d+", line)[0]
return int(mem) / 1024 / 1024
return 0.0
def metis_partition_assignment(g, k, balance_ntypes=None, balance_edges=False,
mode="k-way", objtype='cut'):
''' This assigns nodes to different partitions with Metis partitioning algorithm.
def metis_partition_assignment(
g, k, balance_ntypes=None, balance_edges=False, mode="k-way", objtype="cut"
):
"""This assigns nodes to different partitions with Metis partitioning algorithm.
When performing Metis partitioning, we can put some constraint on the partitioning.
Current, it supports two constrants to balance the partitioning. By default, Metis
......@@ -284,16 +313,24 @@ def metis_partition_assignment(g, k, balance_ntypes=None, balance_edges=False,
-------
a 1-D tensor
A vector with each element that indicates the partition ID of a vertex.
'''
assert mode in ("k-way", "recursive"), "'mode' can only be 'k-way' or 'recursive'"
assert g.idtype == F.int64, "IdType of graph is required to be int64 for now."
"""
assert mode in (
"k-way",
"recursive",
), "'mode' can only be 'k-way' or 'recursive'"
assert (
g.idtype == F.int64
), "IdType of graph is required to be int64 for now."
# METIS works only on symmetric graphs.
# The METIS runs on the symmetric graph to generate the node assignment to partitions.
start = time.time()
sym_gidx = _CAPI_DGLMakeSymmetric_Hetero(g._graph)
sym_g = DGLHeteroGraph(gidx=sym_gidx)
print('Convert a graph into a bidirected graph: {:.3f} seconds, peak memory: {:.3f} GB'.format(
time.time() - start, get_peak_mem()))
print(
"Convert a graph into a bidirected graph: {:.3f} seconds, peak memory: {:.3f} GB".format(
time.time() - start, get_peak_mem()
)
)
vwgt = []
# To balance the node types in each partition, we can take advantage of the vertex weights
# in Metis. When vertex weights are provided, Metis will tries to generate partitions with
......@@ -307,8 +344,9 @@ def metis_partition_assignment(g, k, balance_ntypes=None, balance_edges=False,
# of weights is the same as the number of node types.
start = time.time()
if balance_ntypes is not None:
assert len(balance_ntypes) == g.number_of_nodes(), \
"The length of balance_ntypes should be equal to #nodes in the graph"
assert (
len(balance_ntypes) == g.number_of_nodes()
), "The length of balance_ntypes should be equal to #nodes in the graph"
balance_ntypes = F.tensor(balance_ntypes)
uniq_ntypes = F.unique(balance_ntypes)
for ntype in uniq_ntypes:
......@@ -328,19 +366,31 @@ def metis_partition_assignment(g, k, balance_ntypes=None, balance_edges=False,
# The vertex weights have to be stored in a vector.
if len(vwgt) > 0:
vwgt = F.stack(vwgt, 1)
shape = (np.prod(F.shape(vwgt),),)
shape = (
np.prod(
F.shape(vwgt),
),
)
vwgt = F.reshape(vwgt, shape)
vwgt = F.to_dgl_nd(vwgt)
else:
vwgt = F.zeros((0,), F.int64, F.cpu())
vwgt = F.to_dgl_nd(vwgt)
print('Construct multi-constraint weights: {:.3f} seconds, peak memory: {:.3f} GB'.format(
time.time() - start, get_peak_mem()))
print(
"Construct multi-constraint weights: {:.3f} seconds, peak memory: {:.3f} GB".format(
time.time() - start, get_peak_mem()
)
)
start = time.time()
node_part = _CAPI_DGLMetisPartition_Hetero(sym_g._graph, k, vwgt, mode, (objtype == 'cut'))
print('Metis partitioning: {:.3f} seconds, peak memory: {:.3f} GB'.format(
time.time() - start, get_peak_mem()))
node_part = _CAPI_DGLMetisPartition_Hetero(
sym_g._graph, k, vwgt, mode, (objtype == "cut")
)
print(
"Metis partitioning: {:.3f} seconds, peak memory: {:.3f} GB".format(
time.time() - start, get_peak_mem()
)
)
if len(node_part) == 0:
return None
else:
......@@ -348,9 +398,16 @@ def metis_partition_assignment(g, k, balance_ntypes=None, balance_edges=False,
return node_part.tousertensor()
def metis_partition(g, k, extra_cached_hops=0, reshuffle=False,
balance_ntypes=None, balance_edges=False, mode="k-way"):
''' This is to partition a graph with Metis partitioning.
def metis_partition(
g,
k,
extra_cached_hops=0,
reshuffle=False,
balance_ntypes=None,
balance_edges=False,
mode="k-way",
):
"""This is to partition a graph with Metis partitioning.
Metis assigns vertices to partitions. This API constructs subgraphs with the vertices assigned
to the partitions and their incoming edges. A subgraph may contain HALO nodes which does
......@@ -398,18 +455,25 @@ def metis_partition(g, k, extra_cached_hops=0, reshuffle=False,
--------
a dict of DGLGraphs
The key is the partition ID and the value is the DGLGraph of the partition.
'''
assert mode in ("k-way", "recursive"), "'mode' can only be 'k-way' or 'recursive'"
node_part = metis_partition_assignment(g, k, balance_ntypes, balance_edges, mode)
"""
assert mode in (
"k-way",
"recursive",
), "'mode' can only be 'k-way' or 'recursive'"
node_part = metis_partition_assignment(
g, k, balance_ntypes, balance_edges, mode
)
if node_part is None:
return None
# Then we split the original graph into parts based on the METIS partitioning results.
return partition_graph_with_halo(g, node_part, extra_cached_hops, reshuffle)[0]
return partition_graph_with_halo(
g, node_part, extra_cached_hops, reshuffle
)[0]
class NDArrayPartition(object):
""" Create a new partition of an NDArray. That is, an object which assigns
"""Create a new partition of an NDArray. That is, an object which assigns
each row of an NDArray to a specific partition.
Parameters
......@@ -456,69 +520,78 @@ class NDArrayPartition(object):
>>> part = NDArrayPartition(g.num_nodes(), num_parts, mode='range',
... part_ranges=part_range)
"""
def __init__(self, array_size, num_parts, mode='remainder', part_ranges=None):
def __init__(
self, array_size, num_parts, mode="remainder", part_ranges=None
):
assert num_parts > 0, 'Invalid "num_parts", must be > 0.'
if mode == 'remainder':
assert part_ranges is None, 'When using remainder-based ' \
if mode == "remainder":
assert part_ranges is None, (
"When using remainder-based "
'partitioning, "part_ranges" should not be specified.'
)
self._partition = _CAPI_DGLNDArrayPartitionCreateRemainderBased(
array_size, num_parts)
elif mode == 'range':
assert part_ranges is not None, 'When using range-based ' \
array_size, num_parts
)
elif mode == "range":
assert part_ranges is not None, (
"When using range-based "
'partitioning, "part_ranges" must not be None.'
assert part_ranges[0] == 0 and part_ranges[-1] == array_size, \
'part_ranges[0] must be 0, and part_ranges[-1] must be ' \
)
assert part_ranges[0] == 0 and part_ranges[-1] == array_size, (
"part_ranges[0] must be 0, and part_ranges[-1] must be "
'"array_size".'
)
if F.is_tensor(part_ranges):
part_ranges = F.zerocopy_to_dgl_ndarray(part_ranges)
assert isinstance(part_ranges, NDArray), '"part_ranges" must ' \
'be Tensor or dgl.NDArray.'
assert isinstance(part_ranges, NDArray), (
'"part_ranges" must ' "be Tensor or dgl.NDArray."
)
self._partition = _CAPI_DGLNDArrayPartitionCreateRangeBased(
array_size,
num_parts,
part_ranges)
array_size, num_parts, part_ranges
)
else:
assert False, 'Unknown partition mode "{}"'.format(mode)
self._array_size = array_size
self._num_parts = num_parts
def num_parts(self):
""" Get the number of partitions.
"""
"""Get the number of partitions."""
return self._num_parts
def array_size(self):
""" Get the total size of the first dimension of the partitioned array.
"""
"""Get the total size of the first dimension of the partitioned array."""
return self._array_size
def get(self):
""" Get the C-handle for this object.
"""
"""Get the C-handle for this object."""
return self._partition
def get_local_indices(self, part, ctx):
""" Get the set of global indices in this given partition.
"""
return self.map_to_global(F.arange(0, self.local_size(part), ctx=ctx), part)
"""Get the set of global indices in this given partition."""
return self.map_to_global(
F.arange(0, self.local_size(part), ctx=ctx), part
)
def local_size(self, part):
""" Get the number of rows/items assigned to the given part.
"""
"""Get the number of rows/items assigned to the given part."""
return _CAPI_DGLNDArrayPartitionGetPartSize(self._partition, part)
def map_to_local(self, idxs):
""" Convert the set of global indices to local indices
"""
return F.zerocopy_from_dgl_ndarray(_CAPI_DGLNDArrayPartitionMapToLocal(
self._partition,
F.zerocopy_to_dgl_ndarray(idxs)))
"""Convert the set of global indices to local indices"""
return F.zerocopy_from_dgl_ndarray(
_CAPI_DGLNDArrayPartitionMapToLocal(
self._partition, F.zerocopy_to_dgl_ndarray(idxs)
)
)
def map_to_global(self, idxs, part_id):
""" Convert the set of local indices ot global indices
"""
return F.zerocopy_from_dgl_ndarray(_CAPI_DGLNDArrayPartitionMapToGlobal(
self._partition, F.zerocopy_to_dgl_ndarray(idxs), part_id))
"""Convert the set of local indices ot global indices"""
return F.zerocopy_from_dgl_ndarray(
_CAPI_DGLNDArrayPartitionMapToGlobal(
self._partition, F.zerocopy_to_dgl_ndarray(idxs), part_id
)
)
_init_api("dgl.partition")
......@@ -5,14 +5,22 @@ from . import backend as F
from . import traversal as trv
from .heterograph import DGLHeteroGraph
__all__ = ['prop_nodes', 'prop_nodes_bfs', 'prop_nodes_topo',
'prop_edges', 'prop_edges_dfs']
def prop_nodes(graph,
nodes_generator,
message_func='default',
reduce_func='default',
apply_node_func='default'):
__all__ = [
"prop_nodes",
"prop_nodes_bfs",
"prop_nodes_topo",
"prop_edges",
"prop_edges_dfs",
]
def prop_nodes(
graph,
nodes_generator,
message_func="default",
reduce_func="default",
apply_node_func="default",
):
"""Functional method for :func:`dgl.DGLGraph.prop_nodes`.
Parameters
......@@ -30,13 +38,18 @@ def prop_nodes(graph,
--------
dgl.DGLGraph.prop_nodes
"""
graph.prop_nodes(nodes_generator, message_func, reduce_func, apply_node_func)
def prop_edges(graph,
edges_generator,
message_func='default',
reduce_func='default',
apply_node_func='default'):
graph.prop_nodes(
nodes_generator, message_func, reduce_func, apply_node_func
)
def prop_edges(
graph,
edges_generator,
message_func="default",
reduce_func="default",
apply_node_func="default",
):
"""Functional method for :func:`dgl.DGLGraph.prop_edges`.
Parameters
......@@ -54,14 +67,19 @@ def prop_edges(graph,
--------
dgl.DGLGraph.prop_edges
"""
graph.prop_edges(edges_generator, message_func, reduce_func, apply_node_func)
def prop_nodes_bfs(graph,
source,
message_func,
reduce_func,
reverse=False,
apply_node_func=None):
graph.prop_edges(
edges_generator, message_func, reduce_func, apply_node_func
)
def prop_nodes_bfs(
graph,
source,
message_func,
reduce_func,
reverse=False,
apply_node_func=None,
):
"""Message propagation using node frontiers generated by BFS.
Parameters
......@@ -83,10 +101,12 @@ def prop_nodes_bfs(graph,
--------
dgl.traversal.bfs_nodes_generator
"""
assert isinstance(graph, DGLHeteroGraph), \
'DGLGraph is deprecated, Please use DGLHeteroGraph'
assert len(graph.canonical_etypes) == 1, \
'prop_nodes_bfs only support homogeneous graph'
assert isinstance(
graph, DGLHeteroGraph
), "DGLGraph is deprecated, Please use DGLHeteroGraph"
assert (
len(graph.canonical_etypes) == 1
), "prop_nodes_bfs only support homogeneous graph"
# TODO(murphy): Graph traversal currently is only supported on
# CPP graphs. Move graph to CPU as a workaround,
# which should be fixed in the future.
......@@ -94,11 +114,10 @@ def prop_nodes_bfs(graph,
nodes_gen = [F.copy_to(frontier, graph.device) for frontier in nodes_gen]
prop_nodes(graph, nodes_gen, message_func, reduce_func, apply_node_func)
def prop_nodes_topo(graph,
message_func,
reduce_func,
reverse=False,
apply_node_func=None):
def prop_nodes_topo(
graph, message_func, reduce_func, reverse=False, apply_node_func=None
):
"""Message propagation using node frontiers generated by topological order.
Parameters
......@@ -118,10 +137,12 @@ def prop_nodes_topo(graph,
--------
dgl.traversal.topological_nodes_generator
"""
assert isinstance(graph, DGLHeteroGraph), \
'DGLGraph is deprecated, Please use DGLHeteroGraph'
assert len(graph.canonical_etypes) == 1, \
'prop_nodes_topo only support homogeneous graph'
assert isinstance(
graph, DGLHeteroGraph
), "DGLGraph is deprecated, Please use DGLHeteroGraph"
assert (
len(graph.canonical_etypes) == 1
), "prop_nodes_topo only support homogeneous graph"
# TODO(murphy): Graph traversal currently is only supported on
# CPP graphs. Move graph to CPU as a workaround,
# which should be fixed in the future.
......@@ -129,14 +150,17 @@ def prop_nodes_topo(graph,
nodes_gen = [F.copy_to(frontier, graph.device) for frontier in nodes_gen]
prop_nodes(graph, nodes_gen, message_func, reduce_func, apply_node_func)
def prop_edges_dfs(graph,
source,
message_func,
reduce_func,
reverse=False,
has_reverse_edge=False,
has_nontree_edge=False,
apply_node_func=None):
def prop_edges_dfs(
graph,
source,
message_func,
reduce_func,
reverse=False,
has_reverse_edge=False,
has_nontree_edge=False,
apply_node_func=None,
):
"""Message propagation using edge frontiers generated by labeled DFS.
Parameters
......@@ -162,15 +186,22 @@ def prop_edges_dfs(graph,
--------
dgl.traversal.dfs_labeled_edges_generator
"""
assert isinstance(graph, DGLHeteroGraph), \
'DGLGraph is deprecated, Please use DGLHeteroGraph'
assert len(graph.canonical_etypes) == 1, \
'prop_edges_dfs only support homogeneous graph'
assert isinstance(
graph, DGLHeteroGraph
), "DGLGraph is deprecated, Please use DGLHeteroGraph"
assert (
len(graph.canonical_etypes) == 1
), "prop_edges_dfs only support homogeneous graph"
# TODO(murphy): Graph traversal currently is only supported on
# CPP graphs. Move graph to CPU as a workaround,
# which should be fixed in the future.
edges_gen = trv.dfs_labeled_edges_generator(
graph.cpu(), source, reverse, has_reverse_edge, has_nontree_edge,
return_labels=False)
graph.cpu(),
source,
reverse,
has_reverse_edge,
has_nontree_edge,
return_labels=False,
)
edges_gen = [F.copy_to(frontier, graph.device) for frontier in edges_gen]
prop_edges(graph, edges_gen, message_func, reduce_func, apply_node_func)
"""Python interfaces to DGL random number generators."""
import numpy as np
from ._ffi.function import _init_api
from . import backend as F
from . import ndarray as nd
from ._ffi.function import _init_api
__all__ = ["seed"]
__all__ = ['seed']
def seed(val):
"""Set the random seed of DGL.
......@@ -17,6 +18,7 @@ def seed(val):
"""
_CAPI_SetSeed(val)
def choice(a, size, replace=True, prob=None): # pylint: disable=invalid-name
"""An equivalent to :func:`numpy.random.choice`.
......@@ -57,7 +59,7 @@ def choice(a, size, replace=True, prob=None): # pylint: disable=invalid-name
samples : 1-D tensor
The generated random samples
"""
#TODO(minjie): support RNG as one of the arguments.
# TODO(minjie): support RNG as one of the arguments.
if isinstance(size, tuple):
num = np.prod(size)
else:
......@@ -74,7 +76,9 @@ def choice(a, size, replace=True, prob=None): # pylint: disable=invalid-name
prob = F.zerocopy_to_dgl_ndarray(prob)
bits = 64 # index array is in 64-bit
chosen_idx = _CAPI_Choice(int(num), int(population), prob, bool(replace), bits)
chosen_idx = _CAPI_Choice(
int(num), int(population), prob, bool(replace), bits
)
chosen_idx = F.zerocopy_from_dgl_ndarray(chosen_idx)
if F.is_tensor(a):
......@@ -87,4 +91,5 @@ def choice(a, size, replace=True, prob=None): # pylint: disable=invalid-name
else:
return chosen
_init_api('dgl.rng', __name__)
_init_api("dgl.rng", __name__)
"""Classes and functions for batching multiple graphs together."""
from __future__ import absolute_import
from .base import DGLError, dgl_warning
from . import backend as F
from .base import DGLError, dgl_warning
from .ops import segment
__all__ = ['readout_nodes', 'readout_edges',
'sum_nodes', 'sum_edges', 'mean_nodes', 'mean_edges',
'max_nodes', 'max_edges', 'softmax_nodes', 'softmax_edges',
'broadcast_nodes', 'broadcast_edges', 'topk_nodes', 'topk_edges']
def readout_nodes(graph, feat, weight=None, *, op='sum', ntype=None):
__all__ = [
"readout_nodes",
"readout_edges",
"sum_nodes",
"sum_edges",
"mean_nodes",
"mean_edges",
"max_nodes",
"max_edges",
"softmax_nodes",
"softmax_edges",
"broadcast_nodes",
"broadcast_edges",
"topk_nodes",
"topk_edges",
]
def readout_nodes(graph, feat, weight=None, *, op="sum", ntype=None):
"""Generate a graph-level representation by aggregating node features
:attr:`feat`.
......@@ -87,7 +100,8 @@ def readout_nodes(graph, feat, weight=None, *, op='sum', ntype=None):
x = x * graph.nodes[ntype].data[weight]
return segment.segment_reduce(graph.batch_num_nodes(ntype), x, reducer=op)
def readout_edges(graph, feat, weight=None, *, op='sum', etype=None):
def readout_edges(graph, feat, weight=None, *, op="sum", etype=None):
"""Sum the edge feature :attr:`feat` in :attr:`graph`, optionally
multiplies it by a edge :attr:`weight`.
......@@ -170,6 +184,7 @@ def readout_edges(graph, feat, weight=None, *, op='sum', etype=None):
x = x * graph.edges[etype].data[weight]
return segment.segment_reduce(graph.batch_num_edges(etype), x, reducer=op)
def sum_nodes(graph, feat, weight=None, *, ntype=None):
"""Syntax sugar for ``dgl.readout_nodes(graph, feat, weight, ntype=ntype, op='sum')``.
......@@ -177,7 +192,8 @@ def sum_nodes(graph, feat, weight=None, *, ntype=None):
--------
readout_nodes
"""
return readout_nodes(graph, feat, weight, ntype=ntype, op='sum')
return readout_nodes(graph, feat, weight, ntype=ntype, op="sum")
def sum_edges(graph, feat, weight=None, *, etype=None):
"""Syntax sugar for ``dgl.readout_edges(graph, feat, weight, etype=etype, op='sum')``.
......@@ -186,7 +202,8 @@ def sum_edges(graph, feat, weight=None, *, etype=None):
--------
readout_edges
"""
return readout_edges(graph, feat, weight, etype=etype, op='sum')
return readout_edges(graph, feat, weight, etype=etype, op="sum")
def mean_nodes(graph, feat, weight=None, *, ntype=None):
"""Syntax sugar for ``dgl.readout_nodes(graph, feat, weight, ntype=ntype, op='mean')``.
......@@ -195,7 +212,8 @@ def mean_nodes(graph, feat, weight=None, *, ntype=None):
--------
readout_nodes
"""
return readout_nodes(graph, feat, weight, ntype=ntype, op='mean')
return readout_nodes(graph, feat, weight, ntype=ntype, op="mean")
def mean_edges(graph, feat, weight=None, *, etype=None):
"""Syntax sugar for ``dgl.readout_edges(graph, feat, weight, etype=etype, op='mean')``.
......@@ -204,7 +222,8 @@ def mean_edges(graph, feat, weight=None, *, etype=None):
--------
readout_edges
"""
return readout_edges(graph, feat, weight, etype=etype, op='mean')
return readout_edges(graph, feat, weight, etype=etype, op="mean")
def max_nodes(graph, feat, weight=None, *, ntype=None):
"""Syntax sugar for ``dgl.readout_nodes(graph, feat, weight, ntype=ntype, op='max')``.
......@@ -213,7 +232,8 @@ def max_nodes(graph, feat, weight=None, *, ntype=None):
--------
readout_nodes
"""
return readout_nodes(graph, feat, weight, ntype=ntype, op='max')
return readout_nodes(graph, feat, weight, ntype=ntype, op="max")
def max_edges(graph, feat, weight=None, *, etype=None):
"""Syntax sugar for ``dgl.readout_edges(graph, feat, weight, etype=etype, op='max')``.
......@@ -222,7 +242,8 @@ def max_edges(graph, feat, weight=None, *, etype=None):
--------
readout_edges
"""
return readout_edges(graph, feat, weight, etype=etype, op='max')
return readout_edges(graph, feat, weight, etype=etype, op="max")
def softmax_nodes(graph, feat, *, ntype=None):
r"""Perform graph-wise softmax on the node features.
......@@ -283,6 +304,7 @@ def softmax_nodes(graph, feat, *, ntype=None):
x = graph.nodes[ntype].data[feat]
return segment.segment_softmax(graph.batch_num_nodes(ntype), x)
def softmax_edges(graph, feat, *, etype=None):
r"""Perform graph-wise softmax on the edge features.
......@@ -348,6 +370,7 @@ def softmax_edges(graph, feat, *, etype=None):
x = graph.edges[etype].data[feat]
return segment.segment_softmax(graph.batch_num_edges(etype), x)
def broadcast_nodes(graph, graph_feat, *, ntype=None):
"""Generate a node feature equal to the graph-level feature :attr:`graph_feat`.
......@@ -416,12 +439,15 @@ def broadcast_nodes(graph, graph_feat, *, ntype=None):
--------
broadcast_edges
"""
if (F.shape(graph_feat)[0] != graph.batch_size and graph.batch_size == 1):
dgl_warning('For a single graph, use a tensor of shape (1, *) for graph_feat.'
' The support of shape (*) will be deprecated.')
if F.shape(graph_feat)[0] != graph.batch_size and graph.batch_size == 1:
dgl_warning(
"For a single graph, use a tensor of shape (1, *) for graph_feat."
" The support of shape (*) will be deprecated."
)
graph_feat = F.unsqueeze(graph_feat, dim=0)
return F.repeat(graph_feat, graph.batch_num_nodes(ntype), dim=0)
def broadcast_edges(graph, graph_feat, *, etype=None):
"""Generate an edge feature equal to the graph-level feature :attr:`graph_feat`.
......@@ -487,17 +513,21 @@ def broadcast_edges(graph, graph_feat, *, etype=None):
--------
broadcast_nodes
"""
if (F.shape(graph_feat)[0] != graph.batch_size and graph.batch_size == 1):
dgl_warning('For a single graph, use a tensor of shape (1, *) for graph_feat.'
' The support of shape (*) will be deprecated.')
if F.shape(graph_feat)[0] != graph.batch_size and graph.batch_size == 1:
dgl_warning(
"For a single graph, use a tensor of shape (1, *) for graph_feat."
" The support of shape (*) will be deprecated."
)
graph_feat = F.unsqueeze(graph_feat, dim=0)
return F.repeat(graph_feat, graph.batch_num_edges(etype), dim=0)
READOUT_ON_ATTRS = {
'nodes': ('ndata', 'batch_num_nodes', 'number_of_nodes'),
'edges': ('edata', 'batch_num_edges', 'number_of_edges'),
"nodes": ("ndata", "batch_num_nodes", "number_of_nodes"),
"edges": ("edata", "batch_num_edges", "number_of_edges"),
}
def _topk_torch(keys, k, descending, x):
"""Internal function to take graph-wise top-k node/edge features according to
the rank given by keys, this function is PyTorch only.
......@@ -522,14 +552,18 @@ def _topk_torch(keys, k, descending, x):
A tensor with shape :math:`(batch, k)`.
"""
import torch as th
batch_size, max_len = x.shape[0], x.shape[1]
topk_indices = keys.topk(k, -1, largest=descending)[1] # (batch_size, k)
x = x.view((batch_size * max_len), -1)
shift = th.arange(0, batch_size, device=x.device).view(batch_size, 1) * max_len
shift = (
th.arange(0, batch_size, device=x.device).view(batch_size, 1) * max_len
)
topk_indices_ = topk_indices + shift
x = x[topk_indices_].view(batch_size, k, -1)
return th.masked_fill(x, th.isinf(x), 0), topk_indices
def _topk_on(graph, typestr, feat, k, descending, sortby, ntype_or_etype):
"""Internal function to take graph-wise top-k node/edge features of
field :attr:`feat` in :attr:`graph` ranked by keys at given
......@@ -577,24 +611,28 @@ def _topk_on(graph, typestr, feat, k, descending, sortby, ntype_or_etype):
_, batch_num_objs_attr, _ = READOUT_ON_ATTRS[typestr]
data = getattr(graph, typestr)[ntype_or_etype].data
if F.ndim(data[feat]) > 2:
raise DGLError('Only support {} feature `{}` with dimension less than or'
' equal to 2'.format(typestr, feat))
raise DGLError(
"Only support {} feature `{}` with dimension less than or"
" equal to 2".format(typestr, feat)
)
feat = data[feat]
hidden_size = F.shape(feat)[-1]
batch_num_objs = getattr(graph, batch_num_objs_attr)(ntype_or_etype)
batch_size = len(batch_num_objs)
length = max(max(F.asnumpy(batch_num_objs)), k)
fill_val = -float('inf') if descending else float('inf')
feat_ = F.pad_packed_tensor(feat, batch_num_objs, fill_val, l_min=k) # (batch_size, l, d)
fill_val = -float("inf") if descending else float("inf")
feat_ = F.pad_packed_tensor(
feat, batch_num_objs, fill_val, l_min=k
) # (batch_size, l, d)
if F.backend_name == 'pytorch' and sortby is not None:
if F.backend_name == "pytorch" and sortby is not None:
# PyTorch's implementation of top-K
keys = feat_[..., sortby] # (batch_size, l)
return _topk_torch(keys, k, descending, feat_)
else:
# Fallback to framework-agnostic implementation of top-K
if sortby is not None:
keys = F.squeeze(F.slice_axis(feat_, -1, sortby, sortby+1), -1)
keys = F.squeeze(F.slice_axis(feat_, -1, sortby, sortby + 1), -1)
order = F.argsort(keys, -1, descending=descending)
else:
order = F.argsort(feat_, 1, descending=descending)
......@@ -607,14 +645,18 @@ def _topk_on(graph, typestr, feat, k, descending, sortby, ntype_or_etype):
topk_indices_ = F.reshape(topk_indices, (-1,)) + shift
else:
feat_ = F.reshape(feat_, (-1,))
shift = F.repeat(F.arange(0, batch_size), k * hidden_size, -1) * length * hidden_size +\
F.cat([F.arange(0, hidden_size)] * batch_size * k, -1)
shift = F.repeat(
F.arange(0, batch_size), k * hidden_size, -1
) * length * hidden_size + F.cat(
[F.arange(0, hidden_size)] * batch_size * k, -1
)
shift = F.copy_to(shift, F.context(feat))
topk_indices_ = F.reshape(topk_indices, (-1,)) * hidden_size + shift
out = F.reshape(F.gather_row(feat_, topk_indices_), (batch_size, k, -1))
out = F.replace_inf_with_zero(out)
return out, topk_indices
def topk_nodes(graph, feat, k, *, descending=True, sortby=None, ntype=None):
"""Return a graph-level representation by a graph-wise top-k on
node features :attr:`feat` in :attr:`graph` by feature at index :attr:`sortby`.
......@@ -719,9 +761,16 @@ def topk_nodes(graph, feat, k, *, descending=True, sortby=None, ntype=None):
[3, 2, 0, 2, 2],
[2, 3, 2, 1, 3]]]))
"""
return _topk_on(graph, 'nodes', feat, k,
descending=descending, sortby=sortby,
ntype_or_etype=ntype)
return _topk_on(
graph,
"nodes",
feat,
k,
descending=descending,
sortby=sortby,
ntype_or_etype=ntype,
)
def topk_edges(graph, feat, k, *, descending=True, sortby=None, etype=None):
"""Return a graph-level representation by a graph-wise top-k
......@@ -827,6 +876,12 @@ def topk_edges(graph, feat, k, *, descending=True, sortby=None, etype=None):
[3, 2, 0, 2, 2],
[2, 3, 2, 1, 3]]]))
"""
return _topk_on(graph, 'edges', feat, k,
descending=descending, sortby=sortby,
ntype_or_etype=etype)
return _topk_on(
graph,
"edges",
feat,
k,
descending=descending,
sortby=sortby,
ntype_or_etype=etype,
)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment