"src/vscode:/vscode.git/clone" did not exist on "cb5e3489c0694107295bc70804de1559e97cdb89"
Unverified Commit 7241a9c0 authored by Minjie Wang's avatar Minjie Wang Committed by GitHub
Browse files

[Backend] backend interface (#109)

* backend interface

* small fix

* more comments to the data type dict

* WIP

* convert_to and narrow

* WIP

* pytorch and numpy backend; WIP on mxnet backend

* mxnet backend

* narrow

* Fix all usages

* fix for mx

* fix for mx

* fix mx

* fix mx

* fix mx

* fix mx

* fix mx

* fix mx

* fix mx

* revert jenkins

* add sparse_matrix api

* sparse matrix api

* some fixme

* Fix as requested
parent b420a5b5
from __future__ import absolute_import from __future__ import absolute_import
import os import sys, os
import importlib
__backend__ = os.environ.get('DGLBACKEND', 'pytorch').lower()
if __backend__ == 'numpy': from . import backend
from .numpy import *
create_immutable_graph_index=None _enabled_apis = set()
elif __backend__ == 'pytorch':
from .pytorch import * def _gen_missing_api(api, mod_name):
create_immutable_graph_index=None def _missing_api(*args, **kwargs):
elif __backend__ == 'mxnet': raise ImportError('API "%s" is not supported by backend "%s".'
from .mxnet import * ' You can switch to other backends by setting'
from .mxnet_immutable_graph_index import create_immutable_graph_index ' the DGLBACKEND environment.' % (api, mod_name))
else: return _missing_api
raise Exception("Unsupported backend %s" % __backend__)
def _load_backend():
mod_name = os.environ.get('DGLBACKEND', 'pytorch').lower()
mod = importlib.import_module('.%s' % mod_name, __name__)
thismod = sys.modules[__name__]
for api in backend.__dict__.keys():
if api == 'data_type_dict':
# load data type
if api not in mod.__dict__:
raise ImportError('API "data_type_dict" is required but missing for'
' backend "%s".' % (mod_name))
data_type_dict = mod.__dict__[api]()
for name, dtype in data_type_dict.items():
setattr(thismod, name, dtype)
else:
# load functions
if api in mod.__dict__:
_enabled_apis.add(api)
setattr(thismod, api, mod.__dict__[api])
else:
setattr(thismod, api, _gen_missing_api(api, mod_name))
_load_backend()
def is_enabled(api):
"""Return true if the api is enabled by the current backend.
Parameters
----------
api : str
The api name.
Returns
-------
bool
True if the API is enabled by the current backend.
"""
return api in _enabled_apis
"""This file defines the unified tensor framework interface required by DGL.
The principles of this interface:
* There should be as few interfaces as possible.
* The interface is used by DGL system so it is more important to have
clean definition rather than convenient usage.
* Default arguments should be avoided.
* Keyword or positional arguments should be avoided.
* Argument type should be easier to understand.
It is recommended the frameworks implement all the interfaces. However, it is
also OK to skip some. The generated backend module has an ``is_enbaled`` function
that returns whether the interface is supported by the framework or not.
"""
###############################################################################
# Tensor, data type and context interfaces
def data_type_dict():
"""Returns a dictionary from data type string to the data type.
The dictionary should include at least:
float16
float32
float64
uint8
int8
int16
int32
int64
This function will be called only *once* during the initialization fo the
backend module. The returned dictionary will become the attributes of the
backend module.
Examples
--------
>>> import torch as th
>>> def data_type_dict():
>>> return { 'float16' : th.float16, 'float32' : th.float32, ... }
After the module is initialized.
>>> import backend as F
>>> F.float16 # this will point to torch.float16
Returns
-------
dict of str to data type
The data type dict.
"""
pass
def cpu():
"""Return a context object for CPU device."""
pass
def tensor(data, dtype=None):
"""Create a tensor given the data and data type.
Parameters
----------
data : input data
The interface should at least support list and numpy array.
The data is copied to a newly-allocated tensor.
dtype : data type, optional
It should be one of the values in the data type dict.
If is none, the type should be inferred from data.
Returns
-------
Tensor
A framework-specific tensor.
"""
pass
def sparse_matrix(data, index, shape, force_format=False):
"""Create a sparse matrix.
Parameters
----------
data : Tensor
Data tensor. It should be of shape (nnz,).
index : tuple
This is used to support different sparse formats.
For COO format:
index=('coo', coord), where coord is of shape (2, nnz).
coord[0,:] should be the row index and coord[1,:] should be
the column index.
For CSR format:
index=('csr', indices, indptr), where indices is of shape (nnz,)
and indptr is of shape (nrows+1,). See ``scipy.sparse.csr_matrix``
for more documents on what each array means.
shape : tuple of int
The shape.
force_format : bool
If true, the returned sparse matrix must be stored in the same
format as the given index.
Returns
-------
SparseMatrix
The framework-specific sparse matrix. It can be stored in any format
unless force_format is True.
"""
pass
def sparse_matrix_indices(spmat):
"""Return the indices of the given sparse matrix.
Parameters
----------
spmat : SparseMatrix
The framework-specific sparse matrix.
Returns
-------
index : tuple
This is used to support different sparse formats.
For COO format:
index=('coo', coord), where coord is of shape (2, nnz).
coord[0,:] should be the row index and coord[1,:] should be
the column index.
For CSR format:
index=('csr', indices, indptr), where indices is of shape (nnz,)
and indptr is of shape (nrows+1,). See ``scipy.sparse.csr_matrix``
for more documents on what each array means.
"""
pass
def is_tensor(obj):
"""Returns true if the given object is a framework-specific tensor."""
pass
def shape(input):
"""Return the shape of the tensor.
Parameters
----------
input : Tensor
The input tensor.
Returns
-------
tuple of int
The tensor shape.
"""
pass
def dtype(input):
"""Return the data type of the tensor.
Parameters
----------
input : Tensor
The input tensor.
Returns
-------
data type
It should be one of the values in the data type dict.
"""
pass
def context(input):
"""Return the context/device of the input tensor.
Parameters
----------
input : Tensor
The input tensor.
Returns
-------
Context object
A framework-specific context object.
"""
pass
def astype(input, ty):
"""Convert the input tensor to the given data type.
Parameters
----------
input : Tensor
The input tensor.
ty : data type
It should be one of the values in the data type dict.
Returns
-------
Tensor
A framework-specific tensor.
"""
pass
def asnumpy(input):
"""Convert the input tensor to numpy array.
The data is copied.
Parameters
----------
input : Tensor
The input tensor.
Returns
-------
numpy.ndarray
Numpy array.
"""
pass
def copy_to(input, ctx):
"""Copy the given tensor to the context.
Parameters
----------
input : Tensor
The input tensor
ctx :
A framework-specific context object.
Returns
-------
Tensor
The tensor on the given context.
"""
pass
###############################################################################
# Tensor functions on feature data
# --------------------------------
# These functions are performance critical, so it's better to have efficient
# implementation in each framework.
def sum(input, dim):
"""Reduce sum the input tensor along the given dim.
Parameters
----------
input : Tensor
The input tensor.
dim : int
The reduce dim.
Returns
-------
Tensor
A framework-specific tensor.
"""
pass
def max(input, dim):
"""Reduce max the input tensor along the given dim.
Parameters
----------
input : Tensor
The input tensor.
dim : int
The reduce dim.
Returns
-------
Tensor
A framework-specific tensor.
"""
pass
def cat(seq, dim):
"""Concat the sequence of tensors in the given dimension.
Parameters
----------
seq : list of Tensor
The tensor sequence.
dim : int
The concat dim.
Returns
-------
Tensor
A framework-specific tensor.
"""
pass
def split(input, sizes_or_sections, dim):
"""Split the input tensor into chunks.
If ``sizes_or_sections`` is an integer, then the tensor will
be splitted into equal pieces.
If ``sizes_or_sections`` is a list, then the tensor will be
splitted into segments.
Parameters
----------
input : Tensor
Returns
-------
list of Tensor
The splitted tensors.
"""
pass
def gather_row(data, row_index):
"""Slice out the data given the row index.
Parameters
----------
data : Tensor
The data tensor
row_index : Tensor
A 1-D integer tensor containing which rows to be sliced out.
Returns
-------
Tensor
The sliced data. The first dimension should equal to ``len(row_index)``.
"""
pass
def narrow_row(x, start, stop):
"""Narrow down the tensor along the first dimension.
Parameters
----------
x : Tensor
The input tensor.
start : int
The start index (inclusive).
stop : int
The stop index (exclusive).
Returns
-------
Tensor
The narrowed tensor
Notes
-----
The returned tensor could be a view of the original tensor.
"""
pass
def scatter_row(data, row_index, value):
"""Write the value into the data tensor using the row index.
This is an out-place write so it can work with autograd.
Parameters
----------
data : Tensor
The data tensor to be updated.
row_index : Tensor
A 1-D integer tensor containing which rows to be updated.
value : Tensor
The new value.
Returns
-------
Tensor
The new data.
"""
pass
def scatter_row_inplace(data, row_index, value):
"""Write the value into the data tensor using the row index inplacely.
This is an inplace write so it will break the autograd.
Parameters
----------
data : Tensor
The data tensor to be updated.
row_index : Tensor
A 1-D integer tensor containing which rows to be updated.
value : Tensor
The new value.
"""
pass
def squeeze(input, dim):
"""Remove the given dimension of size 1.
Parameters
----------
input : Tensor
The input tensor.
dim : int
The dimension to be squeezed.
Returns
-------
Tensor
The result tensor.
"""
pass
def unsqueeze(input, dim):
"""Add the given dimension of size 1.
Parameters
----------
input : Tensor
The input tensor.
dim : int
The dimension to be unsqueezed.
Returns
-------
Tensor
The result tensor.
"""
pass
def reshape(input, shape):
"""Reshape the tensor.
Parameters
----------
input : Tensor
The input tensor.
shape : tuple of int
The new shape.
Returns
-------
Tensor
The reshaped tensor.
"""
pass
def zeros(shape, dtype):
"""Create a zero tensor.
Parameters
----------
shape : tuple of int
The tensor shape.
dtype : data type
It should be one of the values in the data type dict.
Returns
-------
Tensor
The zero tensor.
"""
pass
def ones(shape, dtype):
"""Create a one tensor.
Parameters
----------
shape : tuple of int
The tensor shape.
dtype : data type
It should be one of the values in the data type dict.
Returns
-------
Tensor
The one tensor.
"""
pass
def spmm(x, y):
"""Multiply a sparse matrix with a dense matrix.
Parameters
----------
x : SparseTensor
The sparse matrix.
y : Tensor
The dense matrix.
Returns
-------
Tensor
The result dense matrix.
"""
pass
###############################################################################
# Tensor functions used *only* on index tensor
# ----------------
# These operators are light-weighted, so it is acceptable to fallback to
# numpy operators if currently missing in the framework. Ideally in the future,
# DGL should contain all the operations on index, so this set of operators
# should be gradually removed.
def unique(input):
"""Returns the unique scalar elements in a tensor.
Parameters
----------
input : Tensor
Must be a 1-D tensor.
Returns
-------
Tensor
A 1-D tensor containing unique elements.
"""
pass
def full_1d(length, fill_value):
"""Create a 1D tensor full of the fill_value.
Parameters
----------
shape : int
The length of the vector.
fill_value : int
The filled value.
Returns
-------
Tensor
A result 1D tensor
"""
pass
def nonzero_1d(input):
"""Return the nonzero index of the given 1D input.
Parameters
----------
input : Tensor
Must be a 1D tensor.
Returns
-------
Tensor
A 1D integer tensor containing the nonzero indices.
"""
pass
def sort_1d(input):
"""Sort a 1D tensor (in ascending order) and also return the original index.
Parameters
----------
input : Tensor
The tensor to be sorted.
Returns
-------
Tensor
Sorted tensor.
Tensor
Index tensor of the elements in the original input.
"""
pass
def arange(start, stop):
"""Create a 1D range int64 tensor.
Parameters
----------
start : int
The range start.
stop : int
The range stop.
Returns
-------
Tensor
The result tensor.
"""
pass
def zerocopy_to_dlpack(input):
"""Create a dlpack tensor that shares the input memory.
Parameters
----------
input : Tensor
The input tensor
Returns
-------
dlpack capsule
A dlpack capsule that can be used by other framework.
"""
pass
def zerocopy_from_dlpack(dlpack_tensor):
"""Create a tensor that shares the dlpack_tensor.
Parameters
----------
dlpack_tensor : dlpack capsule
The dlpack tensor.
Returns
-------
Tensor
A framework-specific tensor.
"""
pass
def zerocopy_to_numpy(input):
"""Create a numpy ndarray that shares the input memory.
Parameters
----------
input : Tensor
The input tensor
Returns
-------
numpy.ndarray
A numpy ndarray.
"""
pass
def zerocopy_from_numpy(np_array):
"""Create a tensor that shares the numpy array.
Parameters
----------
np_array : numpy.ndarray
The numpy ndarray.
Returns
-------
Tensor
A framework-specific tensor.
"""
pass
###############################################################################
# Other interfaces
# ----------------
# These are not related to tensors. Some of them are temporary workarounds that
# should be included in DGL in the future.
def create_immutable_graph_index():
"""Create an immutable graph index object."""
pass
from __future__ import absolute_import
import numpy as np
import mxnet as mx
import mxnet.ndarray as F
import scipy.sparse
import ctypes
from .._ffi.base import _LIB, check_call, c_array
from .._ffi.runtime_ctypes import TVMType, TVMContext, TVMArray
from .._ffi.runtime_ctypes import TypeCode, tvm_shape_index_t
# Tensor types
Tensor = mx.nd.NDArray
SparseTensor = mx.nd.sparse.CSRNDArray
# Data types
float16 = np.float16
float32 = np.float32
float64 = np.float64
uint8 = np.uint8
int8 = np.int8
int16 = np.int16
int32 = np.int32
int64 = np.int64
# Operators
tensor = mx.nd.array
sum = F.sum
def max(x):
return F.max(x).asnumpy()[0]
def sparse_tensor(idx, data, shape):
return mx.nd.sparse.csr_matrix((data, (idx[0], idx[1])), tuple(shape))
def astype(a, ty):
return F.cast(a, ty)
def asnumpy(a):
return a.asnumpy()
def from_numpy(np_data):
return mx.nd.array(np_data, dtype=np_data.dtype)
def pack(tensors):
return F.concat(*tensors, dim=0)
def unpack(x, split_sizes_or_sections=1):
if isinstance(split_sizes_or_sections, list):
np_arr = x.asnumpy()
indices = np.cumsum(split_sizes_or_sections)
res = np.split(np_arr, indices[:-1])
return [tensor(arr, dtype=x.dtype) for arr in res]
else:
return F.split(x, split_sizes_or_sections)
# TODO this doesn't exist for symbol.
def shape(x):
return x.shape
def dtype(x):
return x.dtype
def isinteger(x):
return x.dtype in [np.int, np.int8, np.int16, np.int32, np.int64]
def unique(x):
# TODO this isn't the best way of running unique.
tmp = x.asnumpy()
tmp = np.unique(tmp)
return mx.nd.array(tmp, ctx=x.context, dtype=x.dtype)
def gather_row(data, row_index):
if isinstance(row_index, F.NDArray):
return F.take(data, row_index)
else:
return data[row_index,]
scatter_row = mx.nd.contrib.index_copy
def broadcast_to(x, to_array):
return x + F.zeros_like(to_array)
squeeze = F.squeeze
unsqueeze = F.expand_dims
# TODO this doesn't exist for symbol.
reshape = F.reshape
ones = F.ones
zeros = F.zeros
arange = F.arange
def spmm(spm, mat):
return mx.nd.dot(spm, mat)
def sort(x, dim=None, descending=False):
if dim is None:
dim = -1
ascend = not descending
# TODO this isn't an ideal implementation.
val = F.sort(x, axis=dim, is_ascend=ascend)
idx = F.argsort(x, axis=dim, is_ascend=ascend)
idx = F.cast(idx, dtype='int64')
return val, idx
def to_context(x, ctx):
if ctx is None:
return x
elif ctx.device_type == TVMContext.STR2MASK['cuda']:
return x.as_in_context(mx.gpu(ctx.device_id))
elif ctx.device_type == TVMContext.STR2MASK['cpu']:
return x.as_in_context(mx.cpu())
else:
raise RuntimeError('Invalid context', ctx)
def get_context(x):
if x.context.device_type == 'cpu':
return TVMContext(TVMContext.STR2MASK['cpu'], 0)
else:
return TVMContext(
TVMContext.STR2MASK[x.context.device_type], x.context.device_id)
def convert_to(src, dst):
'''
Convert src to the same dtype and context as dst
'''
return src.copyto(dst.context).astype(dst.dtype)
def _typestr(arr_dtype):
return arr_dtype
def get_tvmtype(arr):
arr_dtype = arr.dtype
if arr_dtype == np.float16:
return TVMType('float16')
elif arr_dtype == np.float32:
return TVMType('float32')
elif arr_dtype == np.float64:
return TVMType('float64')
elif arr_dtype == np.int16:
return TVMType('int16')
elif arr_dtype == np.int32:
return TVMType('int32')
elif arr_dtype == np.int64:
return TVMType('int64')
elif arr_dtype == np.int8:
return TVMType('int8')
elif arr_dtype == np.uint8:
return TVMType('uint8')
else:
raise RuntimeError('Unsupported data type:', arr_dtype)
def zerocopy_to_dlpack(arr):
"""Return a dlpack compatible array using zero copy."""
return arr.to_dlpack_for_read()
def zerocopy_from_dlpack(dlpack_arr):
"""Return a tensor using zero copy."""
return mx.nd.from_dlpack(dlpack_arr)
def zerocopy_to_numpy(arr):
"""Return a numpy array that shares the data."""
return arr.asnumpy()
def zerocopy_from_numpy(np_data):
"""Return a tensor that shares the numpy data."""
return mx.nd.array(np_data, dtype=np_data.dtype)
from .tensor import *
from .immutable_graph_index import create_immutable_graph_index
...@@ -6,8 +6,6 @@ import networkx as nx ...@@ -6,8 +6,6 @@ import networkx as nx
import scipy.sparse as sp import scipy.sparse as sp
import mxnet as mx import mxnet as mx
from .mxnet import to_context
class ImmutableGraphIndex(object): class ImmutableGraphIndex(object):
"""Backend-specific graph index object on immutable graphs. """Backend-specific graph index object on immutable graphs.
We can use a CSR matrix to represent a graph structure. For functionality, We can use a CSR matrix to represent a graph structure. For functionality,
......
from __future__ import absolute_import
import numpy as np
import mxnet as mx
import mxnet.ndarray as nd
def data_type_dict():
return {'float16' : np.float16,
'float32' : np.float32,
'float64' : np.float64,
'uint8' : np.uint8,
'int8' : np.int8,
'int16' : np.int16,
'int32' : np.int32,
'int64' : np.int64}
def cpu():
return mx.cpu()
def tensor(data, dtype=None):
return nd.array(data, dtype=dtype)
def sparse_matrix(data, index, shape, force_format=False):
fmt = index[0]
if fmt == 'coo':
if force_format:
raise TypeError('MXNet backend only supports CSR format,'
' but COO format is forced.')
coord = index[1]
return nd.sparse.csr_matrix((data, (coord[0], coord[1])), shape)
elif fmt == 'csr':
indices = index[1]
indptr = index[2]
return nd.sparse.csr_matrix((data, indices, indptr), shape)
else:
raise TypeError('Invalid format: %s.' % fmt)
def sparse_matrix_indices(spmat):
return ('csr', spmat.indices, spmat.indptr)
def is_tensor(obj):
return isinstance(obj, nd.NDArray)
def shape(input):
# NOTE: the input cannot be a symbol
return input.shape
def dtype(input):
# NOTE: the input cannot be a symbol
return input.dtype
def context(input):
return input.context
def astype(input, ty):
return nd.cast(input, ty)
def asnumpy(input):
return input.asnumpy()
def copy_to(input, ctx):
return input.as_in_context(ctx)
def sum(input, dim):
return nd.sum(input, axis=dim)
def max(input, dim):
return nd.max(input, axis=dim)
def cat(seq, dim):
return nd.concat(*seq, dim=dim)
def split(x, sizes_or_sections, dim):
if isinstance(sizes_or_sections, list):
# TODO: fallback to numpy is unfortunate
np_arr = x.asnumpy()
indices = np.cumsum(sizes_or_sections)[:-1]
res = np.split(np_arr, indices, axis=dim)
return [tensor(arr, dtype=x.dtype) for arr in res]
else:
return nd.split(x, sizes_or_sections, axis=dim)
def gather_row(data, row_index):
if isinstance(row_index, nd.NDArray):
return nd.take(data, row_index)
else:
return data[row_index,]
def narrow_row(data, start, stop):
return nd.slice(data, begin=start, end=stop)
def scatter_row(data, row_index, value):
return mx.nd.contrib.index_copy(data, row_index, value)
def scatter_row_inplace(data, row_index, value):
data[row_index] = value
def squeeze(input, dim):
return nd.squeeze(input, axis=dim)
def unsqueeze(input, dim):
return nd.expand_dims(input, axis=dim)
def reshape(input, shape):
# NOTE: the input cannot be a symbol
return nd.reshape(input ,shape)
def zeros(shape, dtype):
return nd.zeros(shape, dtype=dtype)
def ones(shape, dtype):
return nd.ones(shape, dtype=dtype)
def spmm(x, y):
return nd.dot(x, y)
def unique(input):
# TODO: fallback to numpy is unfortunate
tmp = input.asnumpy()
tmp = np.unique(tmp)
return nd.array(tmp, ctx=input.context, dtype=input.dtype)
def full_1d(length, fill_value):
return nd.full((length,), fill_value)
def nonzero_1d(input):
# TODO: fallback to numpy is unfortunate
tmp = input.asnumpy()
tmp = np.nonzero(tmp)[0]
return nd.array(tmp, ctx=input.context, dtype=input.dtype)
def sort_1d(input):
# TODO: this isn't an ideal implementation.
val = nd.sort(input, is_ascend=True)
idx = nd.argsort(input, is_ascend=True)
idx = nd.cast(idx, dtype='int64')
return val, idx
def arange(start, stop):
return nd.arange(start, stop, dtype=np.int64)
def zerocopy_to_dlpack(arr):
return arr.to_dlpack_for_read()
def zerocopy_from_dlpack(dlpack_arr):
return nd.from_dlpack(dlpack_arr)
def zerocopy_to_numpy(arr):
# NOTE: not zerocopy
return arr.asnumpy()
def zerocopy_from_numpy(np_data):
# NOTE: not zerocopy
return nd.array(np_data, dtype=np_data.dtype)
from __future__ import absolute_import
import numpy as np
import scipy as sp
Tensor = np.ndarray
SparseTensor = sp.sparse.spmatrix
def asnumpy(a):
return a
def pack(arrays):
return np.concatenate(arrays, axis=0)
def unpack(a, split_size_or_sections=None):
if split_size_or_sections is None:
indices_or_sections = a.shape[0]
else:
# convert split size to split indices by cumsum
indices_or_sections = np.cumsum(split_size_or_sections)[:-1]
return np.split(a, indices_or_sections, axis=0)
def shape(a):
return a.shape
def nonzero_1d(a):
assert a.ndim == 2
return np.nonzero(a)[0]
from __future__ import absolute_import
import numpy as np
import scipy.sparse as sp
import warnings
warnings.warn('Detect using numpy backend. Please be aware that numpy does not support autograd!')
def data_type_dict():
return {'float16' : np.float16,
'float32' : np.float32,
'float64' : np.float64,
'uint8' : np.uint8,
'int8' : np.int8,
'int16' : np.int16,
'int32' : np.int32,
'int64' : np.int64}
def cpu():
return 'cpu'
def tensor(data, dtype=None):
return np.array(data, dtype)
def sparse_matrix(data, index, shape, force_format=False):
fmt = index[0]
if fmt == 'coo':
i = index[1][0,:]
j = index[1][1,:]
return sp.coo_matrix((data, (i, j)), shape=shape)
elif fmt == 'csr':
indices = index[1]
indptr = index[2]
return sp.csr_matrix((data, indices, indptr), shape=shape)
else:
raise TypeError('Invalid format: %s.' % fmt)
def sparse_matrix_indices(spmat):
if spmat.format == 'coo':
return ('coo', np.stack(spmat.row, spmat.col))
elif spmat.format == 'csr':
return ('csr', spmat.indices, spmat.indptr)
else:
raise TypeError('Invalid format: %s.' % spmat.format)
def is_tensor(obj):
return isinstance(obj, np.ndarray)
def shape(input):
return input.shape
def dtype(input):
return input.dtype
def context(input):
return 'cpu'
def astype(input, ty):
return input.astype(ty)
def asnumpy(input):
return input
def copy_to(input, ctx):
return input
def sum(input, dim):
return np.sum(input, axis=dim)
def max(input, dim):
return np.max(input, axis=dim)
def cat(seq, dim):
return np.concatenate(seq, axis=dim)
def split(input, sizes_or_sections, dim):
dimsize = input.shape[dim]
if isinstance(sizes_or_sections, int):
if dimsize % sizes_or_sections != 0:
raise ValueError('Require dimension %d to be equally splitted'
' to %d pieces, but got %d.' % (dim, sizes_or_sections, dimsize))
idx = np.arange(sizes_or_sections, dimsize, sizes_or_sections)
else:
idx = np.cumsum(sizes_or_sections)[0:-1]
return np.split(input, idx, axis=dim)
def gather_row(data, row_index):
return data[row_index]
def scatter_row(data, row_index, value):
# NOTE: inplace instead of out-place
data[row_index] = value
return data
def scatter_row_inplace(data, row_index, value):
data[row_index] = value
def squeeze(input, dim):
return np.squeeze(input, dim)
def unsqueeze(input, dim):
return np.unsqueeze(input, dim)
def reshape(input, shape):
return np.reshape(input ,shape)
def zeros(shape, dtype):
return np.zeros(shape, dtype=dtype)
def ones(shape, dtype):
return np.ones(shape, dtype=dtype)
def spmm(x, y):
return x.dot(y)
def unique(input):
return np.unique(input)
def full_1d(length, fill_value):
return np.full((length,), fill_value)
def nonzero_1d(input):
return np.nonzero(input)[0]
def sort_1d(input):
return np.sort(input), np.argsort(input)
def arange(start, stop):
return np.arange(start, stop, dtype=np.int64)
# zerocopy_to_dlpack not enabled
# zerocopy_from_dlpack not enabled
def zerocopy_to_numpy(input):
return input
def zerocopy_from_numpy(np_array):
return np_array
# create_immutable_graph_index not enabled
from __future__ import absolute_import
import ctypes
import scipy as sp
import torch as th
from torch.utils import dlpack
from .._ffi.base import _LIB, check_call, c_array
from .._ffi.runtime_ctypes import TVMType, TVMContext, TVMArray
from .._ffi.runtime_ctypes import TypeCode, tvm_shape_index_t
from .. import ndarray as nd
# Tensor types
Tensor = th.Tensor
SparseTensor = th.sparse.FloatTensor
# Data types
float16 = th.float16
float32 = th.float32
float64 = th.float64
uint8 = th.uint8
int8 = th.int8
int16 = th.int16
int32 = th.int32
int64 = th.int64
# Operators
tensor = th.tensor
sparse_tensor = th.sparse.FloatTensor
sum = th.sum
max = th.max
stack = th.stack
def astype(a, ty):
return a.type(ty)
def asnumpy(a):
return a.cpu().numpy()
def from_numpy(np_data):
return th.from_numpy(np_data)
def pack(tensors, dim=0):
return th.cat(tensors, dim)
def unpack(x, indices_or_sections=1):
return th.split(x, indices_or_sections)
def shape(x):
return x.shape
def dtype(x):
return x.dtype
unique = th.unique
def gather_row(data, row_index):
return th.index_select(data, 0, row_index)
def scatter_row(data, row_index, value):
return data.index_copy(0, row_index, value)
def broadcast_to(x, to_array):
return x + th.zeros_like(to_array)
nonzero = th.nonzero
squeeze = th.squeeze
unsqueeze = th.unsqueeze
reshape = th.reshape
zeros = th.zeros
ones = th.ones
zeros = th.zeros
spmm = th.spmm
sort = th.sort
arange = th.arange
mul = th.mul
def to_context(arr, ctx):
if ctx is None:
return arr
elif ctx.device_type == TVMContext.STR2MASK['cuda']:
th.cuda.set_device(ctx.device_id)
return arr.cuda()
elif ctx.device_type == TVMContext.STR2MASK['cpu']:
return arr.cpu()
else:
raise RuntimeError('Invalid context', ctx)
def _get_context(type, index):
if type == 'cpu':
return TVMContext(TVMContext.STR2MASK['cpu'], 0)
else:
return TVMContext(TVMContext.STR2MASK[type], index)
def get_context(arr):
return _get_context(arr.device.type, arr.device.index)
_tvmtypes = {
th.float16: TVMType('float16'),
th.float32: TVMType('float32'),
th.float64: TVMType('float64'),
th.int8: TVMType('int8'),
th.uint8: TVMType('uint8'),
th.int16: TVMType('int16'),
th.int32: TVMType('int32'),
th.int64: TVMType('int64'),
}
def convert_to(src, dst):
'''
Convert src to the same dtype and context as dst
'''
return src.to(dst)
def get_tvmtype(arr):
arr_dtype = arr.dtype
if arr_dtype not in _tvmtypes:
raise RuntimeError('Unsupported data type:', arr_dtype)
return _tvmtypes[arr_dtype]
def zerocopy_to_dlpack(arr):
"""Return a dlpack compatible array using zero copy."""
return dlpack.to_dlpack(arr)
def zerocopy_from_dlpack(dlpack_arr):
"""Return a tensor using zero copy."""
return dlpack.from_dlpack(dlpack_arr)
def zerocopy_to_numpy(arr):
"""Return a numpy array that shares the data."""
return arr.numpy()
def zerocopy_from_numpy(np_data):
"""Return a tensor that shares the numpy data."""
return th.from_numpy(np_data)
def nonzero_1d(arr):
"""Return a 1D tensor with nonzero element indices in a 1D vector"""
assert arr.dim() == 1
return th.nonzero(arr)[:, 0]
from __future__ import absolute_import
import torch as th
from torch.utils import dlpack
def data_type_dict():
return {'float16' : th.float16,
'float32' : th.float32,
'float64' : th.float64,
'uint8' : th.uint8,
'int8' : th.int8,
'int16' : th.int16,
'int32' : th.int32,
'int64' : th.int64}
def cpu():
return th.device('cpu')
def tensor(data, dtype=None):
return th.tensor(data, dtype=dtype)
def sparse_matrix(data, index, shape, force_format=False):
fmt = index[0]
if fmt != 'coo':
raise TypeError('Pytorch backend only supports COO format. But got %s.' % fmt)
return th.sparse.FloatTensor(index[1], data, shape)
def sparse_matrix_indices(spmat):
return ('coo', spmat._indices())
def is_tensor(obj):
return isinstance(obj, th.Tensor)
def shape(input):
return input.shape
def dtype(input):
return input.dtype
def context(input):
return input.device
def astype(input, ty):
return input.type(ty)
def asnumpy(input):
return input.cpu().numpy()
def copy_to(input, ctx):
if ctx.type == 'cpu':
return input.cpu()
elif ctx.type == 'cuda':
th.cuda.set_device(ctx.index)
return input.cuda()
else:
raise RuntimeError('Invalid context', ctx)
def sum(input, dim):
return th.sum(input, dim=dim)
def max(input, dim):
# NOTE: the second argmax array is not returned
return th.max(input, dim=dim)[0]
def cat(seq, dim):
return th.cat(seq, dim=dim)
def split(input, sizes_or_sections, dim):
return th.split(input, sizes_or_sections, dim)
def gather_row(data, row_index):
return th.index_select(data, 0, row_index)
def narrow_row(x, start, stop):
return x[start:stop]
def scatter_row(data, row_index, value):
return data.index_copy(0, row_index, value)
def scatter_row_inplace(data, row_index, value):
data[row_index] = value
def squeeze(input, dim):
return th.squeeze(input, dim)
def unsqueeze(input, dim):
return th.unsqueeze(input, dim)
def reshape(input, shape):
return th.reshape(input ,shape)
def zeros(shape, dtype):
return th.zeros(shape, dtype=dtype)
def ones(shape, dtype):
return th.ones(shape, dtype=dtype)
def spmm(x, y):
return th.spmm(x, y)
def unique(input):
return th.unique(input)
def full_1d(length, fill_value):
return th.full((length,), fill_value)
def nonzero_1d(input):
return th.nonzero(input).squeeze()
def sort_1d(input):
return th.sort(input)
def arange(start, stop):
return th.arange(start, stop, dtype=th.int64)
def zerocopy_to_dlpack(input):
return dlpack.to_dlpack(input)
def zerocopy_from_dlpack(dlpack_tensor):
return dlpack.from_dlpack(dlpack_tensor)
def zerocopy_to_numpy(input):
# NOTE: not zerocopy
return asnumpy(input)
def zerocopy_from_numpy(np_array):
return th.from_numpy(np_array)
# create_immutable_graph_index not enabled
...@@ -31,11 +31,11 @@ class BatchedDGLGraph(DGLGraph): ...@@ -31,11 +31,11 @@ class BatchedDGLGraph(DGLGraph):
batched_index = gi.disjoint_union([g._graph for g in graph_list]) batched_index = gi.disjoint_union([g._graph for g in graph_list])
# create batched node and edge frames # create batched node and edge frames
# NOTE: following code will materialize the columns of the input graphs. # NOTE: following code will materialize the columns of the input graphs.
cols = {key: F.pack([gr._node_frame[key] for gr in graph_list]) cols = {key: F.cat([gr._node_frame[key] for gr in graph_list], dim=0)
for key in node_attrs} for key in node_attrs}
batched_node_frame = FrameRef(Frame(cols)) batched_node_frame = FrameRef(Frame(cols))
cols = {key: F.pack([gr._edge_frame[key] for gr in graph_list]) cols = {key: F.cat([gr._edge_frame[key] for gr in graph_list], dim=0)
for key in edge_attrs} for key in edge_attrs}
batched_edge_frame = FrameRef(Frame(cols)) batched_edge_frame = FrameRef(Frame(cols))
...@@ -117,31 +117,6 @@ class BatchedDGLGraph(DGLGraph): ...@@ -117,31 +117,6 @@ class BatchedDGLGraph(DGLGraph):
# TODO # TODO
pass pass
'''
def query_new_node(self, g, u):
idx = self.graph_idx[g]
offset = self.node_offset[idx]
if isinstance(u, (int, np.array, F.Tensor)):
return u + offset
else:
return np.array(u) + offset
def query_new_edge(self, g, src, dst):
idx = self.graph_idx[g]
offset = self.node_offset[idx]
if isinstance(src, (int, np.ndarray, F.Tensor)) and \
isinstance(dst, (int, np.ndarray, F.Tensor)):
return src + offset, dst + offset
else:
return np.array(src) + offset, np.array(dst) + offset
def query_node_start_offset(self):
return self.node_offset[:-1].copy()
def query_edge_start_offset(self):
return self.edge_offset[:-1].copy()
'''
def split(graph_batch, num_or_size_splits): def split(graph_batch, num_or_size_splits):
"""Split the batch.""" """Split the batch."""
# TODO(minjie): could follow torch.split syntax # TODO(minjie): could follow torch.split syntax
...@@ -172,11 +147,11 @@ def unbatch(graph): ...@@ -172,11 +147,11 @@ def unbatch(graph):
node_frames = [FrameRef() for i in range(bsize)] node_frames = [FrameRef() for i in range(bsize)]
edge_frames = [FrameRef() for i in range(bsize)] edge_frames = [FrameRef() for i in range(bsize)]
for attr, col in graph._node_frame.items(): for attr, col in graph._node_frame.items():
col_splits = F.unpack(col, bn) col_splits = F.split(col, bn, dim=0)
for i in range(bsize): for i in range(bsize):
node_frames[i][attr] = col_splits[i] node_frames[i][attr] = col_splits[i]
for attr, col in graph._edge_frame.items(): for attr, col in graph._edge_frame.items():
col_splits = F.unpack(col, be) col_splits = F.split(col, be, dim=0)
for i in range(bsize): for i in range(bsize):
edge_frames[i][attr] = col_splits[i] edge_frames[i][attr] = col_splits[i]
return [DGLGraph(graph_data=pttns[i], return [DGLGraph(graph_data=pttns[i],
......
...@@ -93,8 +93,8 @@ class SBMMixture: ...@@ -93,8 +93,8 @@ class SBMMixture:
for g, adj in zip(self._gs, adjs): for g, adj in zip(self._gs, adjs):
g.from_scipy_sparse_matrix(adj) g.from_scipy_sparse_matrix(adj)
self._lgs = [g.line_graph(backtracking=False) for g in self._gs] self._lgs = [g.line_graph(backtracking=False) for g in self._gs]
in_degrees = lambda g: g.in_degrees(Index(F.arange(g.number_of_nodes(), in_degrees = lambda g: g.in_degrees(
dtype=F.int64))).unsqueeze(1).float() Index(F.arange(0, g.number_of_nodes()))).unsqueeze(1).float()
self._g_degs = [in_degrees(g) for g in self._gs] self._g_degs = [in_degrees(g) for g in self._gs]
self._lg_degs = [in_degrees(lg) for lg in self._lgs] self._lg_degs = [in_degrees(lg) for lg in self._lgs]
self._pm_pds = list(zip(*[g.edges() for g in self._gs]))[0] self._pm_pds = list(zip(*[g.edges() for g in self._gs]))[0]
......
...@@ -5,7 +5,6 @@ from collections import MutableMapping, namedtuple ...@@ -5,7 +5,6 @@ from collections import MutableMapping, namedtuple
import numpy as np import numpy as np
from . import backend as F from . import backend as F
from .backend import Tensor
from .base import DGLError, dgl_warning from .base import DGLError, dgl_warning
from . import utils from . import utils
...@@ -23,7 +22,7 @@ class Scheme(namedtuple('Scheme', ['shape', 'dtype'])): ...@@ -23,7 +22,7 @@ class Scheme(namedtuple('Scheme', ['shape', 'dtype'])):
pass pass
def infer_scheme(tensor): def infer_scheme(tensor):
return Scheme(tuple(F.shape(tensor)[1:]), F.get_tvmtype(tensor)) return Scheme(tuple(F.shape(tensor)[1:]), F.dtype(tensor))
class Column(object): class Column(object):
"""A column is a compact store of features of multiple nodes/edges. """A column is a compact store of features of multiple nodes/edges.
...@@ -66,7 +65,7 @@ class Column(object): ...@@ -66,7 +65,7 @@ class Column(object):
if isinstance(idx, slice): if isinstance(idx, slice):
return self.data[idx] return self.data[idx]
else: else:
user_idx = idx.tousertensor(F.get_context(self.data)) user_idx = idx.tousertensor(F.context(self.data))
return F.gather_row(self.data, user_idx) return F.gather_row(self.data, user_idx)
def __setitem__(self, idx, feats): def __setitem__(self, idx, feats):
...@@ -102,28 +101,29 @@ class Column(object): ...@@ -102,28 +101,29 @@ class Column(object):
% (feat_scheme, self.scheme)) % (feat_scheme, self.scheme))
if isinstance(idx, utils.Index): if isinstance(idx, utils.Index):
idx = idx.tousertensor(F.get_context(self.data)) idx = idx.tousertensor(F.context(self.data))
if inplace: if inplace:
# TODO(minjie): do not use [] operator directly F.scatter_row_inplace(self.data, idx, feats)
self.data[idx] = feats
else: else:
if isinstance(idx, slice): if isinstance(idx, slice):
# for contiguous indices pack is usually faster than scatter row # for contiguous indices pack is usually faster than scatter row
self.data = F.pack([ part1 = F.narrow_row(self.data, 0, idx.start)
self.data[:idx.start], part2 = feats
feats, part3 = F.narrow_row(self.data, idx.stop, len(self))
self.data[idx.stop:], self.data = F.cat([part1, part2, part3], dim=0)
])
else: else:
self.data = F.scatter_row(self.data, idx, feats) self.data = F.scatter_row(self.data, idx, feats)
def extend(self, feats, feat_scheme=None): def extend(self, feats, feat_scheme=None):
"""Extend the feature data. """Extend the feature data.
Parameters Parameters
---------- ----------
feats : Tensor feats : Tensor
The new features. The new features.
feat_scheme : Scheme, optional
The scheme
""" """
if feat_scheme is None: if feat_scheme is None:
feat_scheme = Scheme.infer_scheme(feats) feat_scheme = Scheme.infer_scheme(feats)
...@@ -132,8 +132,8 @@ class Column(object): ...@@ -132,8 +132,8 @@ class Column(object):
raise DGLError("Cannot update column of scheme %s using feature of scheme %s." raise DGLError("Cannot update column of scheme %s using feature of scheme %s."
% (feat_scheme, self.scheme)) % (feat_scheme, self.scheme))
feats = F.convert_to(feats, self.data) feats = F.copy_to(feats, F.context(self.data))
self.data = F.pack([self.data, feats]) self.data = F.cat([self.data, feats], dim=0)
@staticmethod @staticmethod
def create(data): def create(data):
...@@ -183,8 +183,7 @@ class Frame(MutableMapping): ...@@ -183,8 +183,7 @@ class Frame(MutableMapping):
dgl_warning('Initializer is not set. Use zero initializer instead.' dgl_warning('Initializer is not set. Use zero initializer instead.'
' To suppress this warning, use `set_initializer` to' ' To suppress this warning, use `set_initializer` to'
' explicitly specify which initializer to use.') ' explicitly specify which initializer to use.')
# TODO(minjie): handle data type self._initializer = lambda shape, dtype: F.zeros(shape, dtype)
self._initializer = lambda shape, dtype: F.zeros(shape)
def set_initializer(self, initializer): def set_initializer(self, initializer):
"""Set the initializer for empty values. """Set the initializer for empty values.
...@@ -286,7 +285,7 @@ class Frame(MutableMapping): ...@@ -286,7 +285,7 @@ class Frame(MutableMapping):
self._warn_and_set_initializer() self._warn_and_set_initializer()
# TODO(minjie): directly init data on the targer device. # TODO(minjie): directly init data on the targer device.
init_data = self.initializer((self.num_rows,) + scheme.shape, scheme.dtype) init_data = self.initializer((self.num_rows,) + scheme.shape, scheme.dtype)
init_data = F.to_context(init_data, ctx) init_data = F.copy_to(init_data, ctx)
self._columns[name] = Column(init_data, scheme) self._columns[name] = Column(init_data, scheme)
def update_column(self, name, data): def update_column(self, name, data):
...@@ -421,10 +420,8 @@ class FrameRef(MutableMapping): ...@@ -421,10 +420,8 @@ class FrameRef(MutableMapping):
if self._index is None: if self._index is None:
if self.is_contiguous(): if self.is_contiguous():
self._index = utils.toindex( self._index = utils.toindex(
F.arange( F.arange(self._index_data.start,
self._index_data.start, self._index_data.stop))
self._index_data.stop,
dtype=F.int64))
else: else:
self._index = utils.toindex(self._index_data) self._index = utils.toindex(self._index_data)
return self._index return self._index
...@@ -583,7 +580,7 @@ class FrameRef(MutableMapping): ...@@ -583,7 +580,7 @@ class FrameRef(MutableMapping):
self._frame[name] = col self._frame[name] = col
else: else:
if name not in self._frame: if name not in self._frame:
ctx = F.get_context(data) ctx = F.context(data)
self._frame.add_column(name, infer_scheme(data), ctx) self._frame.add_column(name, infer_scheme(data), ctx)
fcol = self._frame[name] fcol = self._frame[name]
fcol.update(self.index_or_slice(), data, inplace) fcol.update(self.index_or_slice(), data, inplace)
...@@ -781,7 +778,7 @@ def merge_frames(frames, indices, max_index, reduce_func): ...@@ -781,7 +778,7 @@ def merge_frames(frames, indices, max_index, reduce_func):
m = len(row) m = len(row)
row = F.unsqueeze(F.tensor(row, dtype=F.int64), 0) row = F.unsqueeze(F.tensor(row, dtype=F.int64), 0)
col = F.unsqueeze(F.tensor(col, dtype=F.int64), 0) col = F.unsqueeze(F.tensor(col, dtype=F.int64), 0)
idx = F.pack([row, col]) idx = F.cat([row, col], dim=0)
dat = F.ones((m,)) dat = F.ones((m,))
adjmat = F.sparse_tensor(idx, dat, [n, m]) adjmat = F.sparse_tensor(idx, dat, [n, m])
ctx_adjmat = utils.CtxCachedObject(lambda ctx: F.to_context(adjmat, ctx)) ctx_adjmat = utils.CtxCachedObject(lambda ctx: F.to_context(adjmat, ctx))
......
...@@ -8,7 +8,6 @@ import numpy as np ...@@ -8,7 +8,6 @@ import numpy as np
import dgl import dgl
from .base import ALL, is_all, DGLError, dgl_warning from .base import ALL, is_all, DGLError, dgl_warning
from . import backend as F from . import backend as F
from .backend import Tensor
from .frame import FrameRef, Frame, merge_frames from .frame import FrameRef, Frame, merge_frames
from .function.message import BundledMessageFunction from .function.message import BundledMessageFunction
from .function.reducer import BundledReduceFunction from .function.reducer import BundledReduceFunction
...@@ -535,8 +534,8 @@ class DGLGraph(object): ...@@ -535,8 +534,8 @@ class DGLGraph(object):
self._msg_graph.add_nodes(self._graph.number_of_nodes()) self._msg_graph.add_nodes(self._graph.number_of_nodes())
# copy attributes # copy attributes
def _batcher(lst): def _batcher(lst):
if isinstance(lst[0], Tensor): if F.is_tensor(lst[0]):
return F.pack([F.unsqueeze(x, 0) for x in lst]) return F.cat([F.unsqueeze(x, 0) for x in lst], dim=0)
else: else:
return F.tensor(lst) return F.tensor(lst)
if node_attrs is not None: if node_attrs is not None:
...@@ -988,7 +987,7 @@ class DGLGraph(object): ...@@ -988,7 +987,7 @@ class DGLGraph(object):
v_is_all = is_all(v) v_is_all = is_all(v)
if v_is_all: if v_is_all:
v = F.arange(0, self.number_of_nodes(), dtype=F.int64) v = F.arange(0, self.number_of_nodes())
elif isinstance(v, int): elif isinstance(v, int):
v = [v] v = [v]
v = utils.toindex(v) v = utils.toindex(v)
...@@ -1029,14 +1028,14 @@ class DGLGraph(object): ...@@ -1029,14 +1028,14 @@ class DGLGraph(object):
self.reset_messages() self.reset_messages()
# Pack all reducer results together # Pack all reducer results together
reordered_v = F.pack(reordered_v) reordered_v = F.cat(reordered_v, dim=0)
keys = new_reprs[0].keys() keys = new_reprs[0].keys()
new_reprs = {key : F.pack([repr[key] for repr in new_reprs]) new_reprs = {key : F.cat([repr[key] for repr in new_reprs], dim=0)
for key in keys} for key in keys}
if v_is_all and not has_zero_degree: if v_is_all and not has_zero_degree:
# First do reorder and then replace the whole column. # First do reorder and then replace the whole column.
_, indices = F.sort(reordered_v) _, indices = F.sort_1d(reordered_v)
indices = utils.toindex(indices) indices = utils.toindex(indices)
new_reprs = utils.reorder(new_reprs, indices) new_reprs = utils.reorder(new_reprs, indices)
self.set_n_repr(new_reprs) self.set_n_repr(new_reprs)
...@@ -1415,7 +1414,7 @@ class DGLGraph(object): ...@@ -1415,7 +1414,7 @@ class DGLGraph(object):
if is_all(nodes): if is_all(nodes):
return F.nonzero_1d(n_mask) return F.nonzero_1d(n_mask)
else: else:
nodes = F.Tensor(nodes) nodes = F.tensor(nodes)
return nodes[n_mask] return nodes[n_mask]
def filter_edges(self, predicate, edges=ALL): def filter_edges(self, predicate, edges=ALL):
...@@ -1443,7 +1442,7 @@ class DGLGraph(object): ...@@ -1443,7 +1442,7 @@ class DGLGraph(object):
if is_all(edges): if is_all(edges):
return F.nonzero_1d(e_mask) return F.nonzero_1d(e_mask)
else: else:
edges = F.Tensor(edges) edges = F.tensor(edges)
return edges[e_mask] return edges[e_mask]
def _internal_apply_nodes(self, v, apply_node_func="default", reduce_accum=None): def _internal_apply_nodes(self, v, apply_node_func="default", reduce_accum=None):
......
...@@ -494,13 +494,14 @@ class GraphIndex(object): ...@@ -494,13 +494,14 @@ class GraphIndex(object):
src = F.unsqueeze(src.tousertensor(), 0) src = F.unsqueeze(src.tousertensor(), 0)
dst = F.unsqueeze(dst.tousertensor(), 0) dst = F.unsqueeze(dst.tousertensor(), 0)
if transpose: if transpose:
idx = F.pack([src, dst]) idx = F.cat([src, dst], dim=0)
else: else:
idx = F.pack([dst, src]) idx = F.cat([dst, src], dim=0)
n = self.number_of_nodes() n = self.number_of_nodes()
dat = F.ones((self.number_of_edges(),)) # FIXME(minjie): data type
mat = F.sparse_tensor(idx, dat, [n, n]) dat = F.ones((self.number_of_edges(),), dtype=F.float32)
self._cache['adj'] = utils.CtxCachedObject(lambda ctx: F.to_context(mat, ctx)) mat = F.sparse_matrix(dat, ('coo', idx), (n, n))
self._cache['adj'] = utils.CtxCachedObject(lambda ctx: F.copy_to(mat, ctx))
return self._cache['adj'] return self._cache['adj']
def incidence_matrix(self, oriented=False): def incidence_matrix(self, oriented=False):
...@@ -522,25 +523,27 @@ class GraphIndex(object): ...@@ -522,25 +523,27 @@ class GraphIndex(object):
src = src.tousertensor() src = src.tousertensor()
dst = dst.tousertensor() dst = dst.tousertensor()
m = self.number_of_edges() m = self.number_of_edges()
eid = F.arange(m, dtype=F.int64) eid = F.arange(0, m)
row = F.pack([src, dst]) row = F.unsqueeze(F.cat([src, dst], dim=0), 0)
col = F.pack([eid, eid]) col = F.unsqueeze(F.cat([eid, eid], dim=0), 0)
idx = F.stack([row, col]) idx = F.cat([row, col], dim=0)
diagonal = (src == dst) diagonal = (src == dst)
if oriented: if oriented:
x = -F.ones((m,)) # FIXME(minjie): data type
y = F.ones((m,)) x = -F.ones((m,), dtype=F.float32)
y = F.ones((m,), dtype=F.float32)
x[diagonal] = 0 x[diagonal] = 0
y[diagonal] = 0 y[diagonal] = 0
dat = F.pack([x, y]) dat = F.cat([x, y], dim=0)
else: else:
x = F.ones((m,)) # FIXME(minjie): data type
x = F.ones((m,), dtype=F.float32)
x[diagonal] = 0 x[diagonal] = 0
dat = F.pack([x, x]) dat = F.cat([x, x], dim=0)
n = self.number_of_nodes() n = self.number_of_nodes()
mat = F.sparse_tensor(idx, dat, [n, m]) mat = F.sparse_matrix(dat, ('coo', idx), (n, m))
self._cache[key] = utils.CtxCachedObject(lambda ctx: F.to_context(mat, ctx)) self._cache[key] = utils.CtxCachedObject(lambda ctx: F.copy_to(mat, ctx))
return self._cache[key] return self._cache[key]
......
...@@ -132,8 +132,8 @@ class ImmutableGraphIndex(object): ...@@ -132,8 +132,8 @@ class ImmutableGraphIndex(object):
bool bool
True if the edge exists True if the edge exists
""" """
u = F.tensor([u]) u = F.tensor([u], dtype=F.int64)
v = F.tensor([v]) v = F.tensor([v], dtype=F.int64)
return self._sparse.has_edges(u, v).asnumpy()[0] return self._sparse.has_edges(u, v).asnumpy()[0]
def has_edges_between(self, u, v): def has_edges_between(self, u, v):
...@@ -151,7 +151,8 @@ class ImmutableGraphIndex(object): ...@@ -151,7 +151,8 @@ class ImmutableGraphIndex(object):
utils.Index utils.Index
0-1 array indicating existence 0-1 array indicating existence
""" """
return utils.toindex(self._sparse.has_edges(u.tousertensor(), v.tousertensor())) ret = self._sparse.has_edges(u.tousertensor(), v.tousertensor())
return utils.toindex(ret)
def predecessors(self, v, radius=1): def predecessors(self, v, radius=1):
"""Return the predecessors of the node. """Return the predecessors of the node.
...@@ -204,8 +205,8 @@ class ImmutableGraphIndex(object): ...@@ -204,8 +205,8 @@ class ImmutableGraphIndex(object):
int int
The edge id. The edge id.
""" """
u = F.tensor([u]) u = F.tensor([u], dtype=F.int64)
v = F.tensor([v]) v = F.tensor([v], dtype=F.int64)
id = self._sparse.edge_ids(u, v) id = self._sparse.edge_ids(u, v)
return utils.toindex(id) return utils.toindex(id)
...@@ -434,7 +435,7 @@ class ImmutableGraphIndex(object): ...@@ -434,7 +435,7 @@ class ImmutableGraphIndex(object):
""" """
def get_adj(ctx): def get_adj(ctx):
new_mat = self._sparse.adjacency_matrix(transpose) new_mat = self._sparse.adjacency_matrix(transpose)
return F.to_context(new_mat, ctx) return F.copy_to(new_mat, ctx)
if not transpose and 'in_adj' in self._cache: if not transpose and 'in_adj' in self._cache:
return self._cache['in_adj'] return self._cache['in_adj']
......
...@@ -56,8 +56,8 @@ def _process_buckets(buckets): ...@@ -56,8 +56,8 @@ def _process_buckets(buckets):
# split buckets # split buckets
unique_v = v.tousertensor() unique_v = v.tousertensor()
msg_ids = msg_ids.tousertensor() msg_ids = msg_ids.tousertensor()
dsts = F.unpack(unique_v, v_section) dsts = F.split(unique_v, v_section, dim=0)
msg_ids = F.unpack(msg_ids, msg_section) msg_ids = F.split(msg_ids, msg_section, dim=0)
# convert to utils.Index # convert to utils.Index
unique_v = utils.toindex(unique_v) unique_v = utils.toindex(unique_v)
...@@ -136,7 +136,7 @@ class SPMVOperator(Executor): ...@@ -136,7 +136,7 @@ class SPMVOperator(Executor):
def run(self): def run(self):
# get src col # get src col
srccol = self.node_repr[self.src_field] srccol = self.node_repr[self.src_field]
ctx = F.get_context(srccol) ctx = F.context(srccol)
# build adjmat # build adjmat
adjmat = self.adj_build_fn(self.edge_field, ctx, self.use_edge_feat) adjmat = self.adj_build_fn(self.edge_field, ctx, self.use_edge_feat)
...@@ -145,7 +145,7 @@ class SPMVOperator(Executor): ...@@ -145,7 +145,7 @@ class SPMVOperator(Executor):
if len(F.shape(srccol)) == 1: if len(F.shape(srccol)) == 1:
srccol = F.unsqueeze(srccol, 1) srccol = F.unsqueeze(srccol, 1)
dstcol = F.spmm(adjmat, srccol) dstcol = F.spmm(adjmat, srccol)
dstcol = F.squeeze(dstcol) dstcol = F.squeeze(dstcol, 1)
else: else:
dstcol = F.spmm(adjmat, srccol) dstcol = F.spmm(adjmat, srccol)
return {self.dst_field : dstcol} return {self.dst_field : dstcol}
...@@ -190,7 +190,7 @@ class DegreeBucketingExecutor(Executor): ...@@ -190,7 +190,7 @@ class DegreeBucketingExecutor(Executor):
# Pack all reducer results together # Pack all reducer results together
keys = new_reprs[0].keys() keys = new_reprs[0].keys()
new_reprs = {key : F.pack([repr[key] for repr in new_reprs]) new_reprs = {key : F.cat([repr[key] for repr in new_reprs], dim=0)
for key in keys} for key in keys}
return new_reprs return new_reprs
...@@ -273,10 +273,11 @@ class UpdateAllExecutor(BasicExecutor): ...@@ -273,10 +273,11 @@ class UpdateAllExecutor(BasicExecutor):
def _adj_build_fn(self, edge_field, ctx, use_edge_feat): def _adj_build_fn(self, edge_field, ctx, use_edge_feat):
if use_edge_feat: if use_edge_feat:
dat = self.edge_repr[edge_field] dat = self.edge_repr[edge_field]
dat = F.squeeze(dat) if len(F.shape(dat)) > 1:
# TODO(minjie): should not directly use _indices # The edge feature is of shape (N, 1)
idx = self.g.adjacency_matrix(ctx)._indices() dat = F.squeeze(dat, 1)
adjmat = F.sparse_tensor(idx, dat, self.graph_shape) idx = F.sparse_matrix_indices(self.g.adjacency_matrix(ctx))
adjmat = F.sparse_matrix(dat, idx, self.graph_shape)
else: else:
adjmat = self.g.adjacency_matrix(ctx) adjmat = self.g.adjacency_matrix(ctx)
return adjmat return adjmat
...@@ -334,18 +335,22 @@ class SendRecvExecutor(BasicExecutor): ...@@ -334,18 +335,22 @@ class SendRecvExecutor(BasicExecutor):
new_v = old2new[v] new_v = old2new[v]
n = self.g.number_of_nodes() n = self.g.number_of_nodes()
m = len(new2old) m = len(new2old)
self._graph_idx = F.pack([F.unsqueeze(new_v, 0), F.unsqueeze(u, 0)]) self._graph_idx = F.cat(
[F.unsqueeze(new_v, 0), F.unsqueeze(u, 0)], dim=0)
self._graph_shape = [m, n] self._graph_shape = [m, n]
self._recv_nodes = new2old self._recv_nodes = new2old
def _adj_build_fn(self, edge_field, ctx, use_edge_feat): def _adj_build_fn(self, edge_field, ctx, use_edge_feat):
if use_edge_feat: if use_edge_feat:
dat = self.edge_repr[edge_field] dat = self.edge_repr[edge_field]
dat = F.squeeze(dat) if len(F.shape(dat)) > 1:
# edge feature is of shape (N, 1)
dat = F.squeeze(dat, dim=1)
else: else:
dat = F.ones((len(self.u), )) # TODO(minjie): data type should be adjusted according t othe usage.
adjmat = F.sparse_tensor(self.graph_idx, dat, self.graph_shape) dat = F.ones((len(self.u), ), dtype=F.float32)
return F.to_context(adjmat, ctx) adjmat = F.sparse_matrix(dat, ('coo', self.graph_idx), self.graph_shape)
return F.copy_to(adjmat, ctx)
class BundledExecutor(BasicExecutor): class BundledExecutor(BasicExecutor):
......
...@@ -73,7 +73,7 @@ class EdgeBatch(object): ...@@ -73,7 +73,7 @@ class EdgeBatch(object):
""" """
if is_all(self._edges[2]): if is_all(self._edges[2]):
self._edges[2] = utils.toindex(F.arange( self._edges[2] = utils.toindex(F.arange(
0, self._g.number_of_edges(), dtype=F.int64)) 0, self._g.number_of_edges()))
u, v, eid = self._edges u, v, eid = self._edges
return (u.tousertensor(), v.tousertensor(), eid.tousertensor()) return (u.tousertensor(), v.tousertensor(), eid.tousertensor())
...@@ -139,7 +139,7 @@ class NodeBatch(object): ...@@ -139,7 +139,7 @@ class NodeBatch(object):
""" """
if is_all(self._nodes): if is_all(self._nodes):
self._nodes = utils.toindex(F.arange( self._nodes = utils.toindex(F.arange(
0, self._g.number_of_nodes(), dtype=F.int64)) 0, self._g.number_of_nodes()))
return self._nodes.tousertensor() return self._nodes.tousertensor()
def batch_size(self): def batch_size(self):
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment