Unverified Commit ca2a7e1c authored by Minjie Wang's avatar Minjie Wang Committed by GitHub
Browse files

[Refactor] Nodeflow, sampling, CAPI (#430)

* enable cython

* add helper function and data structure for void_p vector return

* move sampler from graph index to contrib.sampling

* WIP

* WIP

* refactor layer sampling

* pass tests

* fix lint

* fix graphsage

* remove comments

* pickle test

* fix comments

* update dev guide for cython build
parent 27e0e547
...@@ -112,6 +112,12 @@ To achieve this, export following environment variables: ...@@ -112,6 +112,12 @@ To achieve this, export following environment variables:
export DGL_LIBRARY_PATH=$DGL_HOME/build export DGL_LIBRARY_PATH=$DGL_HOME/build
export PYTHONPATH=$PYTHONPATH:$DGL_HOME/python export PYTHONPATH=$PYTHONPATH:$DGL_HOME/python
If you are working on performance critical part, you may want to turn on Cython build:
.. code-block:: bash
cd python
python setup.py build_ext --inplace
You could test the build by running the following command and see the path of your local clone. You could test the build by running the following command and see the path of your local clone.
.. code-block:: bash .. code-block:: bash
......
...@@ -10,8 +10,6 @@ import dgl ...@@ -10,8 +10,6 @@ import dgl
import dgl.function as fn import dgl.function as fn
from dgl import DGLGraph from dgl import DGLGraph
from dgl.data import register_data_args, load_data from dgl.data import register_data_args, load_data
from dgl.graph_index import map_to_nodeflow_nid
class GraphSAGELayer(gluon.Block): class GraphSAGELayer(gluon.Block):
def __init__(self, def __init__(self,
...@@ -116,7 +114,7 @@ class GraphSAGETrain(gluon.Block): ...@@ -116,7 +114,7 @@ class GraphSAGETrain(gluon.Block):
for i, layer in enumerate(self.layers): for i, layer in enumerate(self.layers):
parent_nid = dgl.utils.toindex(nf.layer_parent_nid(i+1)) parent_nid = dgl.utils.toindex(nf.layer_parent_nid(i+1))
layer_nid = map_to_nodeflow_nid(nf._graph, i, parent_nid).tousertensor() layer_nid = nf.map_from_parent_nid(i, parent_nid)
self_h = h[layer_nid] self_h = h[layer_nid]
# activation from previous layer of myself, used in graphSAGE # activation from previous layer of myself, used in graphSAGE
nf.layers[i+1].data['self_h'] = self_h nf.layers[i+1].data['self_h'] = self_h
...@@ -170,7 +168,7 @@ class GraphSAGEInfer(gluon.Block): ...@@ -170,7 +168,7 @@ class GraphSAGEInfer(gluon.Block):
for i, layer in enumerate(self.layers): for i, layer in enumerate(self.layers):
nf.layers[i].data['h'] = h nf.layers[i].data['h'] = h
parent_nid = dgl.utils.toindex(nf.layer_parent_nid(i+1)) parent_nid = dgl.utils.toindex(nf.layer_parent_nid(i+1))
layer_nid = map_to_nodeflow_nid(nf._graph, i, parent_nid).tousertensor() layer_nid = nf.map_from_parent_nid(i, parent_nid)
# activation from previous layer of the nodes in (i+1)-th layer, used in graphSAGE # activation from previous layer of the nodes in (i+1)-th layer, used in graphSAGE
self_h = h[layer_nid] self_h = h[layer_nid]
nf.layers[i+1].data['self_h'] = self_h nf.layers[i+1].data['self_h'] = self_h
......
...@@ -57,7 +57,8 @@ class SamplerOp { ...@@ -57,7 +57,8 @@ class SamplerOp {
* \param add_self_loop whether to add self loop to the sampled subgraph * \param add_self_loop whether to add self loop to the sampled subgraph
* \return a NodeFlow graph. * \return a NodeFlow graph.
*/ */
static NodeFlow NeighborUniformSample(const ImmutableGraph *graph, IdArray seeds, static NodeFlow NeighborUniformSample(const ImmutableGraph *graph,
const std::vector<dgl_id_t>& seeds,
const std::string &edge_type, const std::string &edge_type,
int num_hops, int expand_factor, int num_hops, int expand_factor,
const bool add_self_loop); const bool add_self_loop);
...@@ -72,9 +73,10 @@ class SamplerOp { ...@@ -72,9 +73,10 @@ class SamplerOp {
* \param layer_sizes The size of layers. * \param layer_sizes The size of layers.
* \return a NodeFlow graph. * \return a NodeFlow graph.
*/ */
static NodeFlow LayerUniformSample(const ImmutableGraph *graph, IdArray seed_array, static NodeFlow LayerUniformSample(const ImmutableGraph *graph,
const std::vector<dgl_id_t>& seeds,
const std::string &neigh_type, const std::string &neigh_type,
const std::vector<size_t> &layer_sizes); IdArray layer_sizes);
/*! /*!
* \brief Batch-generate random walk traces * \brief Batch-generate random walk traces
......
...@@ -11,7 +11,8 @@ from .base import ALL ...@@ -11,7 +11,8 @@ from .base import ALL
from .backend import load_backend from .backend import load_backend
from .batched_graph import * from .batched_graph import *
from .graph import DGLGraph from .graph import DGLGraph
from .nodeflow import *
from .traversal import * from .traversal import *
from .transform import *
from .propagate import * from .propagate import *
from .udf import NodeBatch, EdgeBatch from .udf import NodeBatch, EdgeBatch
from .transform import *
...@@ -104,17 +104,18 @@ cdef extern from "dgl/runtime/c_runtime_api.h": ...@@ -104,17 +104,18 @@ cdef extern from "dgl/runtime/c_runtime_api.h":
DLManagedTensor** out) DLManagedTensor** out)
void DGLDLManagedTensorCallDeleter(DLManagedTensor* dltensor) void DGLDLManagedTensorCallDeleter(DLManagedTensor* dltensor)
cdef extern from "dgl/c_dsl_api.h": # (minjie): Node and class module are not used in DGL.
int DGLNodeFree(NodeHandle handle) #cdef extern from "dgl/c_dsl_api.h":
int DGLNodeTypeKey2Index(const char* type_key, # int DGLNodeFree(NodeHandle handle)
int* out_index) # int DGLNodeTypeKey2Index(const char* type_key,
int DGLNodeGetTypeIndex(NodeHandle handle, # int* out_index)
int* out_index) # int DGLNodeGetTypeIndex(NodeHandle handle,
int DGLNodeGetAttr(NodeHandle handle, # int* out_index)
const char* key, # int DGLNodeGetAttr(NodeHandle handle,
DGLValue* out_value, # const char* key,
int* out_type_code, # DGLValue* out_value,
int* out_success) # int* out_type_code,
# int* out_success)
cdef inline py_str(const char* x): cdef inline py_str(const char* x):
if PY_MAJOR_VERSION < 3: if PY_MAJOR_VERSION < 3:
......
include "./base.pxi" include "./base.pxi"
include "./node.pxi" # (minjie): Node and class module are not used in DGL.
#include "./node.pxi"
include "./function.pxi" include "./function.pxi"
include "./ndarray.pxi" include "./ndarray.pxi"
...@@ -3,7 +3,8 @@ import traceback ...@@ -3,7 +3,8 @@ import traceback
from cpython cimport Py_INCREF, Py_DECREF from cpython cimport Py_INCREF, Py_DECREF
from numbers import Number, Integral from numbers import Number, Integral
from ..base import string_types from ..base import string_types
from ..node_generic import convert_to_node, NodeGeneric # (minjie): Node and class module are not used in DGL.
# from ..node_generic import convert_to_node, NodeGeneric
from ..runtime_ctypes import DGLType, DGLContext, DGLByteArray from ..runtime_ctypes import DGLType, DGLContext, DGLByteArray
...@@ -24,8 +25,9 @@ cdef int dgl_callback(DGLValue* args, ...@@ -24,8 +25,9 @@ cdef int dgl_callback(DGLValue* args,
for i in range(num_args): for i in range(num_args):
value = args[i] value = args[i]
tcode = type_codes[i] tcode = type_codes[i]
if (tcode == kNodeHandle or # (minjie): Node and class module are not used in DGL.
tcode == kFuncHandle or #if (tcode == kNodeHandle or
if (tcode == kFuncHandle or
tcode == kModuleHandle or tcode == kModuleHandle or
tcode > kExtBegin): tcode > kExtBegin):
CALL(DGLCbArgToReturn(&value, tcode)) CALL(DGLCbArgToReturn(&value, tcode))
...@@ -79,10 +81,11 @@ cdef inline int make_arg(object arg, ...@@ -79,10 +81,11 @@ cdef inline int make_arg(object arg,
list temp_args) except -1: list temp_args) except -1:
"""Pack arguments into c args dgl call accept""" """Pack arguments into c args dgl call accept"""
cdef unsigned long long ptr cdef unsigned long long ptr
if isinstance(arg, NodeBase): # (minjie): Node and class module are not used in DGL.
value[0].v_handle = (<NodeBase>arg).chandle #if isinstance(arg, NodeBase):
tcode[0] = kNodeHandle # value[0].v_handle = (<NodeBase>arg).chandle
elif isinstance(arg, NDArrayBase): # tcode[0] = kNodeHandle
if isinstance(arg, NDArrayBase):
value[0].v_handle = (<NDArrayBase>arg).chandle value[0].v_handle = (<NDArrayBase>arg).chandle
tcode[0] = (kNDArrayContainer if tcode[0] = (kNDArrayContainer if
not (<NDArrayBase>arg).c_is_view else kArrayHandle) not (<NDArrayBase>arg).c_is_view else kArrayHandle)
...@@ -131,14 +134,15 @@ cdef inline int make_arg(object arg, ...@@ -131,14 +134,15 @@ cdef inline int make_arg(object arg,
value[0].v_str = tstr value[0].v_str = tstr
tcode[0] = kStr tcode[0] = kStr
temp_args.append(tstr) temp_args.append(tstr)
elif isinstance(arg, (list, tuple, dict, NodeGeneric)): # (minjie): Node and class module are not used in DGL.
arg = convert_to_node(arg) #elif isinstance(arg, (list, tuple, dict, NodeGeneric)):
value[0].v_handle = (<NodeBase>arg).chandle # arg = convert_to_node(arg)
tcode[0] = kNodeHandle # value[0].v_handle = (<NodeBase>arg).chandle
temp_args.append(arg) # tcode[0] = kNodeHandle
elif isinstance(arg, _CLASS_MODULE): # temp_args.append(arg)
value[0].v_handle = c_handle(arg.handle) #elif isinstance(arg, _CLASS_MODULE):
tcode[0] = kModuleHandle # value[0].v_handle = c_handle(arg.handle)
# tcode[0] = kModuleHandle
elif isinstance(arg, FunctionBase): elif isinstance(arg, FunctionBase):
value[0].v_handle = (<FunctionBase>arg).chandle value[0].v_handle = (<FunctionBase>arg).chandle
tcode[0] = kFuncHandle tcode[0] = kFuncHandle
...@@ -166,9 +170,10 @@ cdef inline bytearray make_ret_bytes(void* chandle): ...@@ -166,9 +170,10 @@ cdef inline bytearray make_ret_bytes(void* chandle):
cdef inline object make_ret(DGLValue value, int tcode): cdef inline object make_ret(DGLValue value, int tcode):
"""convert result to return value.""" """convert result to return value."""
if tcode == kNodeHandle: # (minjie): Node and class module are not used in DGL.
return make_ret_node(value.v_handle) #if tcode == kNodeHandle:
elif tcode == kNull: # return make_ret_node(value.v_handle)
if tcode == kNull:
return None return None
elif tcode == kInt: elif tcode == kInt:
return value.v_int64 return value.v_int64
...@@ -184,8 +189,9 @@ cdef inline object make_ret(DGLValue value, int tcode): ...@@ -184,8 +189,9 @@ cdef inline object make_ret(DGLValue value, int tcode):
return ctypes_handle(value.v_handle) return ctypes_handle(value.v_handle)
elif tcode == kDGLContext: elif tcode == kDGLContext:
return DGLContext(value.v_ctx.device_type, value.v_ctx.device_id) return DGLContext(value.v_ctx.device_type, value.v_ctx.device_id)
elif tcode == kModuleHandle: # (minjie): Node and class module are not used in DGL.
return _CLASS_MODULE(ctypes_handle(value.v_handle)) #elif tcode == kModuleHandle:
# return _CLASS_MODULE(ctypes_handle(value.v_handle))
elif tcode == kFuncHandle: elif tcode == kFuncHandle:
fobj = _CLASS_FUNCTION(None, False) fobj = _CLASS_FUNCTION(None, False)
(<FunctionBase>fobj).chandle = value.v_handle (<FunctionBase>fobj).chandle = value.v_handle
......
...@@ -258,7 +258,6 @@ def extract_ext_funcs(finit): ...@@ -258,7 +258,6 @@ def extract_ext_funcs(finit):
raise RuntimeError("cannot initialize with %s" % finit) raise RuntimeError("cannot initialize with %s" % finit)
return fdict return fdict
def _get_api(f): def _get_api(f):
flocal = f flocal = f
flocal.is_global = True flocal.is_global = True
...@@ -285,19 +284,30 @@ def _init_api_prefix(module_name, prefix): ...@@ -285,19 +284,30 @@ def _init_api_prefix(module_name, prefix):
module = sys.modules[module_name] module = sys.modules[module_name]
for name in list_global_func_names(): for name in list_global_func_names():
if prefix == "api":
fname = name
if name.startswith("_"): if name.startswith("_"):
target_module = sys.modules["dgl._api_internal"] continue
else:
target_module = module
else:
if not name.startswith(prefix): if not name.startswith(prefix):
continue continue
fname = name[len(prefix)+1:] fname = name[len(prefix)+1:]
target_module = module target_module = module
if fname.find(".") != -1: if fname.find(".") != -1:
print('Warning: invalid API name "%s".' % fname)
continue
f = get_global_func(name)
ff = _get_api(f)
ff.__name__ = fname
ff.__doc__ = ("DGL PackedFunc %s. " % fname)
setattr(target_module, ff.__name__, ff)
def _init_internal_api():
for name in list_global_func_names():
if not name.startswith("_"):
continue
target_module = sys.modules["dgl._api_internal"]
fname = name
if fname.find(".") != -1:
print('Warning: invalid API name "%s".' % fname)
continue continue
f = get_global_func(name) f = get_global_func(name)
ff = _get_api(f) ff = _get_api(f)
......
...@@ -4,6 +4,7 @@ from __future__ import absolute_import ...@@ -4,6 +4,7 @@ from __future__ import absolute_import
import warnings import warnings
from ._ffi.base import DGLError # pylint: disable=unused-import from ._ffi.base import DGLError # pylint: disable=unused-import
from ._ffi.function import _init_internal_api
# A special symbol for selecting all nodes or edges. # A special symbol for selecting all nodes or edges.
ALL = "__ALL__" ALL = "__ALL__"
...@@ -15,3 +16,5 @@ def is_all(arg): ...@@ -15,3 +16,5 @@ def is_all(arg):
def dgl_warning(msg): def dgl_warning(msg):
"""Print out warning messages.""" """Print out warning messages."""
warnings.warn(msg) warnings.warn(msg)
_init_internal_api()
# This file contains NodeFlow samplers. """This file contains NodeFlow samplers."""
import sys import sys
import numpy as np import numpy as np
...@@ -6,9 +6,12 @@ import threading ...@@ -6,9 +6,12 @@ import threading
import random import random
import traceback import traceback
from ..._ffi.function import _init_api
from ... import utils from ... import utils
from ...node_flow import NodeFlow from ...nodeflow import NodeFlow
from ... import backend as F from ... import backend as F
from ...utils import unwrap_to_ptr_list
try: try:
import Queue as queue import Queue as queue
except ImportError: except ImportError:
...@@ -30,10 +33,12 @@ class SampledSubgraphLoader(object): ...@@ -30,10 +33,12 @@ class SampledSubgraphLoader(object):
self._expand_factor = expand_factor self._expand_factor = expand_factor
self._num_hops = num_hops self._num_hops = num_hops
elif sampler == 'layer': elif sampler == 'layer':
self._layer_sizes = layer_sizes self._layer_sizes = utils.toindex(layer_sizes)
else: else:
raise NotImplementedError() raise NotImplementedError('Invalid sampler option: "%s"' % sampler)
self._node_prob = node_prob self._node_prob = node_prob
if node_prob is not None:
raise NotImplementedError('Non-uniform sampling is currently not supported.')
self._add_self_loop = add_self_loop self._add_self_loop = add_self_loop
if self._node_prob is not None: if self._node_prob is not None:
assert self._node_prob.shape[0] == g.number_of_nodes(), \ assert self._node_prob.shape[0] == g.number_of_nodes(), \
...@@ -44,6 +49,7 @@ class SampledSubgraphLoader(object): ...@@ -44,6 +49,7 @@ class SampledSubgraphLoader(object):
self._seed_nodes = seed_nodes self._seed_nodes = seed_nodes
if shuffle: if shuffle:
self._seed_nodes = F.rand_shuffle(self._seed_nodes) self._seed_nodes = F.rand_shuffle(self._seed_nodes)
self._seed_nodes = utils.toindex(self._seed_nodes)
self._num_workers = num_workers self._num_workers = num_workers
self._neighbor_type = neighbor_type self._neighbor_type = neighbor_type
self._nflows = [] self._nflows = []
...@@ -51,25 +57,31 @@ class SampledSubgraphLoader(object): ...@@ -51,25 +57,31 @@ class SampledSubgraphLoader(object):
self._nflow_idx = 0 self._nflow_idx = 0
def _prefetch(self): def _prefetch(self):
seed_ids = []
num_nodes = len(self._seed_nodes)
for i in range(self._num_workers):
start = self._nflow_idx * self._batch_size
# if we have visited all nodes, don't do anything.
if start >= num_nodes:
break
end = min((self._nflow_idx + 1) * self._batch_size, num_nodes)
seed_ids.append(utils.toindex(self._seed_nodes[start:end]))
self._nflow_idx += 1
if self._sampler == 'neighbor': if self._sampler == 'neighbor':
sgi = self._g._graph.neighbor_sampling(seed_ids, self._expand_factor, handles = unwrap_to_ptr_list(_CAPI_UniformSampling(
self._num_hops, self._neighbor_type, self._g._graph._handle,
self._node_prob, self._add_self_loop) self._seed_nodes.todgltensor(),
int(self._nflow_idx), # start batch id
int(self._batch_size), # batch size
int(self._num_workers), # num batches
int(self._expand_factor),
int(self._num_hops),
self._neighbor_type,
self._add_self_loop))
elif self._sampler == 'layer': elif self._sampler == 'layer':
sgi = self._g._graph.layer_sampling(seed_ids, self._layer_sizes, handles = unwrap_to_ptr_list(_CAPI_LayerSampling(
self._neighbor_type, self._node_prob) self._g._graph._handle,
nflows = [NodeFlow(self._g, i) for i in sgi] self._seed_nodes.todgltensor(),
int(self._nflow_idx), # start batch id
int(self._batch_size), # batch size
int(self._num_workers), # num batches
self._layer_sizes.todgltensor(),
self._neighbor_type))
else:
raise NotImplementedError('Invalid sampler option: "%s"' % self._sampler)
nflows = [NodeFlow(self._g, hdl) for hdl in handles]
self._nflows.extend(nflows) self._nflows.extend(nflows)
self._nflow_idx += len(nflows)
def __iter__(self): def __iter__(self):
return self return self
...@@ -240,6 +252,8 @@ def NeighborSampler(g, batch_size, expand_factor, num_hops=1, ...@@ -240,6 +252,8 @@ def NeighborSampler(g, batch_size, expand_factor, num_hops=1,
* str: indicates some common ways of calculating the number of sampled neighbors, * str: indicates some common ways of calculating the number of sampled neighbors,
e.g., ``sqrt(deg)``. e.g., ``sqrt(deg)``.
Note that no matter how large the expand_factor, the max number of sampled neighbors
is the neighborhood size.
num_hops : int, optional num_hops : int, optional
The number of hops to sample (i.e, the number of layers in the NodeFlow). The number of hops to sample (i.e, the number of layers in the NodeFlow).
Default: 1 Default: 1
...@@ -279,7 +293,8 @@ def NeighborSampler(g, batch_size, expand_factor, num_hops=1, ...@@ -279,7 +293,8 @@ def NeighborSampler(g, batch_size, expand_factor, num_hops=1,
expand_factor=expand_factor, num_hops=num_hops, expand_factor=expand_factor, num_hops=num_hops,
neighbor_type=neighbor_type, node_prob=node_prob, neighbor_type=neighbor_type, node_prob=node_prob,
seed_nodes=seed_nodes, shuffle=shuffle, seed_nodes=seed_nodes, shuffle=shuffle,
num_workers=num_workers) num_workers=num_workers,
add_self_loop=add_self_loop)
if not prefetch: if not prefetch:
return loader return loader
else: else:
...@@ -324,3 +339,29 @@ def LayerSampler(g, batch_size, layer_sizes, ...@@ -324,3 +339,29 @@ def LayerSampler(g, batch_size, layer_sizes,
return loader return loader
else: else:
return _PrefetchingLoader(loader, num_prefetch=num_workers*2) return _PrefetchingLoader(loader, num_prefetch=num_workers*2)
def create_full_nodeflow(g, num_layers, add_self_loop=False):
"""Convert a full graph to NodeFlow to run a L-layer GNN model.
Parameters
----------
g : DGLGraph
a DGL graph
num_layers : int
The number of layers
add_self_loop : bool, default False
Whether to add self loop to the sampled NodeFlow.
If True, the edge IDs of the self loop edges are -1.
Returns
-------
NodeFlow
a NodeFlow with a specified number of layers.
"""
batch_size = g.number_of_nodes()
expand_factor = g.number_of_nodes()
sampler = NeighborSampler(g, batch_size, expand_factor,
num_layers, add_self_loop=add_self_loop)
return next(sampler)
_init_api('dgl.sampling', __name__)
...@@ -10,7 +10,6 @@ from ._ffi.base import c_array ...@@ -10,7 +10,6 @@ from ._ffi.base import c_array
from ._ffi.function import _init_api from ._ffi.function import _init_api
from .base import DGLError from .base import DGLError
from . import backend as F from . import backend as F
from . import ndarray as nd
from . import utils from . import utils
GraphIndexHandle = ctypes.c_void_p GraphIndexHandle = ctypes.c_void_p
...@@ -63,8 +62,12 @@ class GraphIndex(object): ...@@ -63,8 +62,12 @@ class GraphIndex(object):
"""The actual init function""" """The actual init function"""
assert len(src_ids) == len(dst_ids) assert len(src_ids) == len(dst_ids)
assert len(src_ids) == len(edge_ids) assert len(src_ids) == len(edge_ids)
self._handle = _CAPI_DGLGraphCreate(src_ids.todgltensor(), dst_ids.todgltensor(), self._handle = _CAPI_DGLGraphCreate(
edge_ids.todgltensor(), self._multigraph, num_nodes, src_ids.todgltensor(),
dst_ids.todgltensor(),
edge_ids.todgltensor(),
self._multigraph,
int(num_nodes),
self._readonly) self._readonly)
def add_nodes(self, num): def add_nodes(self, num):
...@@ -218,7 +221,7 @@ class GraphIndex(object): ...@@ -218,7 +221,7 @@ class GraphIndex(object):
bool bool
True if the edge exists, False otherwise True if the edge exists, False otherwise
""" """
return bool(_CAPI_DGLGraphHasEdgeBetween(self._handle, u, v)) return bool(_CAPI_DGLGraphHasEdgeBetween(self._handle, int(u), int(v)))
def has_edges_between(self, u, v): def has_edges_between(self, u, v):
"""Return true if the edge exists. """Return true if the edge exists.
...@@ -288,7 +291,7 @@ class GraphIndex(object): ...@@ -288,7 +291,7 @@ class GraphIndex(object):
utils.Index utils.Index
The edge id array. The edge id array.
""" """
return utils.toindex(_CAPI_DGLGraphEdgeId(self._handle, u, v)) return utils.toindex(_CAPI_DGLGraphEdgeId(self._handle, int(u), int(v)))
def edge_ids(self, u, v): def edge_ids(self, u, v):
"""Return a triplet of arrays that contains the edge IDs. """Return a triplet of arrays that contains the edge IDs.
...@@ -445,7 +448,7 @@ class GraphIndex(object): ...@@ -445,7 +448,7 @@ class GraphIndex(object):
int int
The in degree. The in degree.
""" """
return _CAPI_DGLGraphInDegree(self._handle, v) return _CAPI_DGLGraphInDegree(self._handle, int(v))
def in_degrees(self, v): def in_degrees(self, v):
"""Return the in degrees of the nodes. """Return the in degrees of the nodes.
...@@ -476,7 +479,7 @@ class GraphIndex(object): ...@@ -476,7 +479,7 @@ class GraphIndex(object):
int int
The out degree. The out degree.
""" """
return _CAPI_DGLGraphOutDegree(self._handle, v) return _CAPI_DGLGraphOutDegree(self._handle, int(v))
def out_degrees(self, v): def out_degrees(self, v):
"""Return the out degrees of the nodes. """Return the out degrees of the nodes.
...@@ -675,45 +678,6 @@ class GraphIndex(object): ...@@ -675,45 +678,6 @@ class GraphIndex(object):
shuffle_idx = utils.toindex(shuffle_idx) if shuffle_idx is not None else None shuffle_idx = utils.toindex(shuffle_idx) if shuffle_idx is not None else None
return inc, shuffle_idx return inc, shuffle_idx
def neighbor_sampling(self, seed_ids, expand_factor, num_hops, neighbor_type,
node_prob, add_self_loop=False):
"""Neighborhood sampling"""
if len(seed_ids) == 0:
return []
seed_ids = [v.todgltensor() for v in seed_ids]
num_subgs = len(seed_ids)
if node_prob is None:
rst = _uniform_sampling(self, seed_ids, neighbor_type, num_hops,
expand_factor, add_self_loop)
else:
rst = _nonuniform_sampling(self, node_prob, seed_ids, neighbor_type, num_hops,
expand_factor)
return [NodeFlowIndex(rst(i), self, utils.toindex(rst(num_subgs + i)),
utils.toindex(rst(num_subgs * 2 + i)),
utils.toindex(rst(num_subgs * 3 + i)),
utils.toindex(rst(num_subgs * 4 + i))) for i in range(num_subgs)]
def layer_sampling(self, seed_ids, layer_sizes, neighbor_type, node_prob=None):
"""Layer sampling"""
if len(seed_ids) == 0:
return []
seed_ids = [v.todgltensor() for v in seed_ids]
layer_sizes = nd.from_dlpack(F.zerocopy_to_dlpack(F.tensor(layer_sizes)))
if node_prob is None:
rst = _layer_uniform_sampling(self, seed_ids, neighbor_type, layer_sizes)
else:
raise NotImplementedError()
num_subgs = len(seed_ids)
return [NodeFlowIndex(rst(i), self, utils.toindex(rst(num_subgs + i)),
utils.toindex(rst(num_subgs * 2 + i)),
utils.toindex(rst(num_subgs * 3 + i)),
utils.toindex(rst(num_subgs * 4 + i))) for i in range(num_subgs)]
def random_walk(self, seeds, num_traces, num_hops): def random_walk(self, seeds, num_traces, num_hops):
"""Random walk sampling. """Random walk sampling.
...@@ -918,75 +882,6 @@ class SubgraphIndex(GraphIndex): ...@@ -918,75 +882,6 @@ class SubgraphIndex(GraphIndex):
raise NotImplementedError( raise NotImplementedError(
"SubgraphIndex unpickling is not supported yet.") "SubgraphIndex unpickling is not supported yet.")
class NodeFlowIndex(GraphIndex):
"""Graph index for a NodeFlow graph.
Parameters
----------
handle : GraphIndexHandle
The capi handle.
paranet : GraphIndex
The parent graph index.
node_mapping : utils.Index
This maps nodes to the parent graph.
edge_mapping : utils.Index
The maps edges to the parent graph.
layers: utils.Index
The offsets of the layers.
flows: utils.Index
The offsets of the flows.
"""
def __init__(self, handle, parent, node_mapping, edge_mapping, layers, flows):
super(NodeFlowIndex, self).__init__(handle, parent.is_multigraph(), parent.is_readonly())
self._parent = parent
self._node_mapping = node_mapping
self._edge_mapping = edge_mapping
self._layers = layers
self._flows = flows
@property
def node_mapping(self):
"""Return the node mapping to the parent graph.
Returns
-------
utils.Index
The node mapping.
"""
return self._node_mapping
@property
def edge_mapping(self):
"""Return the edge mapping to the parent graph.
Returns
-------
utils.Index
The edge mapping.
"""
return self._edge_mapping
@property
def layers(self):
"""Return layer offsets.
"""
return self._layers
@property
def flows(self):
"""Return flow offsets.
"""
return self._flows
def __getstate__(self):
raise NotImplementedError(
"SubgraphIndex pickling is not supported yet.")
def __setstate__(self, state):
raise NotImplementedError(
"SubgraphIndex unpickling is not supported yet.")
def map_to_subgraph_nid(subgraph, parent_nids): def map_to_subgraph_nid(subgraph, parent_nids):
"""Map parent node Ids to the subgraph node Ids. """Map parent node Ids to the subgraph node Ids.
...@@ -1006,33 +901,23 @@ def map_to_subgraph_nid(subgraph, parent_nids): ...@@ -1006,33 +901,23 @@ def map_to_subgraph_nid(subgraph, parent_nids):
return utils.toindex(_CAPI_DGLMapSubgraphNID(subgraph.induced_nodes.todgltensor(), return utils.toindex(_CAPI_DGLMapSubgraphNID(subgraph.induced_nodes.todgltensor(),
parent_nids.todgltensor())) parent_nids.todgltensor()))
def map_to_nodeflow_nid(nflow, layer_id, parent_nids): def transform_ids(mapping, ids):
"""Map parent node Ids to NodeFlow node Ids in a certain layer. """Transform ids by the given mapping.
Parameters Parameters
---------- ----------
nflow : NodeFlowIndex mapping : utils.Index
The graph index of a NodeFlow. The id mapping. new_id = mapping[old_id]
ids : utils.Index
layer_id : int The old ids.
The layer Id.
parent_nids: utils.Index
Node Ids in the parent graph.
Returns Returns
------- -------
utils.Index utils.Index
Node Ids in the NodeFlow. The new ids.
""" """
mapping = nflow.node_mapping.tousertensor() return utils.toindex(_CAPI_DGLMapSubgraphNID(
layers = nflow.layers.tonumpy() mapping.todgltensor(), ids.todgltensor()))
start = int(layers[layer_id])
end = int(layers[layer_id + 1])
mapping = mapping[start:end]
mapping = utils.toindex(mapping)
return utils.toindex(_CAPI_DGLMapSubgraphNID(mapping.todgltensor(),
parent_nids.todgltensor()))
def disjoint_union(graphs): def disjoint_union(graphs):
"""Return a disjoint union of the input graphs. """Return a disjoint union of the input graphs.
...@@ -1145,51 +1030,3 @@ def create_graph_index(graph_data=None, multigraph=False, readonly=False): ...@@ -1145,51 +1030,3 @@ def create_graph_index(graph_data=None, multigraph=False, readonly=False):
_init_api("dgl.graph_index") _init_api("dgl.graph_index")
# TODO(zhengda): we'll support variable-length inputs.
_NEIGHBOR_SAMPLING_APIS = {
1: _CAPI_DGLGraphUniformSampling,
2: _CAPI_DGLGraphUniformSampling2,
4: _CAPI_DGLGraphUniformSampling4,
8: _CAPI_DGLGraphUniformSampling8,
16: _CAPI_DGLGraphUniformSampling16,
32: _CAPI_DGLGraphUniformSampling32,
64: _CAPI_DGLGraphUniformSampling64,
128: _CAPI_DGLGraphUniformSampling128,
}
_EMPTY_ARRAYS = [utils.toindex(F.ones(shape=(0), dtype=F.int64, ctx=F.cpu()))]
def _uniform_sampling(gidx, seed_ids, neigh_type, num_hops, expand_factor, add_self_loop):
num_seeds = len(seed_ids)
empty_ids = []
if len(seed_ids) > 1 and len(seed_ids) not in _NEIGHBOR_SAMPLING_APIS.keys():
remain = 2**int(math.ceil(math.log2(len(dgl_ids)))) - len(dgl_ids)
empty_ids = _EMPTY_ARRAYS[0:remain]
seed_ids.extend([empty.todgltensor() for empty in empty_ids])
assert len(seed_ids) in _NEIGHBOR_SAMPLING_APIS.keys()
return _NEIGHBOR_SAMPLING_APIS[len(seed_ids)](gidx._handle, *seed_ids, neigh_type,
num_hops, expand_factor, num_seeds,
add_self_loop)
_LAYER_SAMPLING_APIS = {
1: _CAPI_DGLGraphLayerUniformSampling,
2: _CAPI_DGLGraphLayerUniformSampling2,
4: _CAPI_DGLGraphLayerUniformSampling4,
8: _CAPI_DGLGraphLayerUniformSampling8,
16: _CAPI_DGLGraphLayerUniformSampling16,
32: _CAPI_DGLGraphLayerUniformSampling32,
64: _CAPI_DGLGraphLayerUniformSampling64,
128: _CAPI_DGLGraphLayerUniformSampling128,
}
def _layer_uniform_sampling(gidx, seed_ids, neigh_type, layer_sizes):
num_seeds = len(seed_ids)
empty_ids = []
if len(seed_ids) > 1 and len(seed_ids) not in _LAYER_SAMPLING_APIS.keys():
remain = 2**int(math.ceil(math.log2(len(dgl_ids)))) - len(dgl_ids)
empty_ids = _EMPTY_ARRAYS[0:remain]
seed_ids.extend([empty.todgltensor() for empty in empty_ids])
assert len(seed_ids) in _LAYER_SAMPLING_APIS.keys()
return _LAYER_SAMPLING_APIS[len(seed_ids)](gidx._handle, *seed_ids, neigh_type,
layer_sizes, num_seeds)
"""Class for NodeFlow data structure.""" """Class for NodeFlow data structure."""
from __future__ import absolute_import from __future__ import absolute_import
import ctypes
from ._ffi.function import _init_api
from .base import ALL, is_all, DGLError from .base import ALL, is_all, DGLError
from . import backend as F from . import backend as F
from .frame import Frame, FrameRef from .frame import Frame, FrameRef
from .graph import DGLBaseGraph from .graph import DGLBaseGraph
from .graph_index import GraphIndex, transform_ids
from .runtime import ir, scheduler, Runtime from .runtime import ir, scheduler, Runtime
from . import utils from . import utils
from .view import LayerView, BlockView from .view import LayerView, BlockView
def _copy_to_like(arr1, arr2): __all__ = ['NodeFlow']
return F.copy_to(arr1, F.context(arr2))
def _get_frame(frame, names, ids):
col_dict = {name: frame[name][_copy_to_like(ids, frame[name])] for name in names}
if len(col_dict) == 0:
return FrameRef(Frame(num_rows=len(ids)))
else:
return FrameRef(Frame(col_dict))
def _update_frame(frame, names, ids, new_frame):
col_dict = {name: new_frame[name] for name in names}
if len(col_dict) > 0:
frame.update_rows(ids, FrameRef(Frame(col_dict)), inplace=True)
NodeFlowHandle = ctypes.c_void_p
class NodeFlow(DGLBaseGraph): class NodeFlow(DGLBaseGraph):
"""The NodeFlow class stores the sampling results of Neighbor sampling and Layer-wise sampling. """The NodeFlow class stores the sampling results of Neighbor
sampling and Layer-wise sampling.
These sampling algorithms generate graphs with multiple layers. The
edges connect the nodes between two layers while there don't exist
edges between the nodes in the same layer.
These sampling algorithms generate graphs with multiple layers. The edges connect the nodes We store multiple layers of the sampling results in a single graph.
between two layers while there don't exist edges between the nodes in the same layer. We store extra information, such as the node and edge mapping from
the NodeFlow graph to the parent graph.
We store multiple layers of the sampling results in a single graph. We store extra information, DO NOT create NodeFlow object directly. Use sampling method to
such as the node and edge mapping from the NodeFlow graph to the parent graph. generate NodeFlow instead.
Parameters Parameters
---------- ----------
parent : DGLGraph parent : DGLGraph
The parent graph The parent graph.
graph_index : NodeFlowIndex handle : NodeFlowHandle
The graph index of the NodeFlow graph. The handle to the underlying C structure.
""" """
def __init__(self, parent, graph_idx): def __init__(self, parent, handle):
super(NodeFlow, self).__init__(graph_idx) # NOTE(minjie): handle is a pointer to the underlying C++ structure
# defined in include/dgl/sampler.h. The constructor will save
# all its members in the python side and destroy the handler
# afterwards. One can view the given handle object as a transient
# argument pack to construct this python class.
# TODO(minjie): We should use TVM's Node system as a cleaner solution later.
super(NodeFlow, self).__init__(GraphIndex(_CAPI_NodeFlowGetGraph(handle)))
self._parent = parent self._parent = parent
self._node_mapping = graph_idx.node_mapping self._node_mapping = utils.toindex(_CAPI_NodeFlowGetNodeMapping(handle))
self._edge_mapping = graph_idx.edge_mapping self._edge_mapping = utils.toindex(_CAPI_NodeFlowGetEdgeMapping(handle))
self._layer_offsets = graph_idx.layers.tonumpy() self._layer_offsets = utils.toindex(
self._block_offsets = graph_idx.flows.tonumpy() _CAPI_NodeFlowGetLayerOffsets(handle)).tonumpy()
self._block_offsets = utils.toindex(
_CAPI_NodeFlowGetBlockOffsets(handle)).tonumpy()
_CAPI_NodeFlowFree(handle)
# node/edge frames
self._node_frames = [FrameRef(Frame(num_rows=self.layer_size(i))) \ self._node_frames = [FrameRef(Frame(num_rows=self.layer_size(i))) \
for i in range(self.num_layers)] for i in range(self.num_layers)]
self._edge_frames = [FrameRef(Frame(num_rows=self.block_size(i))) \ self._edge_frames = [FrameRef(Frame(num_rows=self.block_size(i))) \
...@@ -252,6 +259,32 @@ class NodeFlow(DGLBaseGraph): ...@@ -252,6 +259,32 @@ class NodeFlow(DGLBaseGraph):
""" """
return self._edge_mapping.tousertensor()[eid] return self._edge_mapping.tousertensor()[eid]
def map_from_parent_nid(self, layer_id, parent_nids):
"""Map parent node Ids to NodeFlow node Ids in a certain layer.
Parameters
----------
layer_id : int
The layer Id.
parent_nids: list or Tensor
Node Ids in the parent graph.
Returns
-------
Tensor
Node Ids in the NodeFlow.
"""
parent_nids = utils.toindex(parent_nids)
layers = self._layer_offsets
start = int(layers[layer_id])
end = int(layers[layer_id + 1])
# TODO(minjie): should not directly use []
mapping = self._node_mapping.tousertensor()
mapping = mapping[start:end]
mapping = utils.toindex(mapping)
nflow_ids = transform_ids(mapping, parent_nids)
return nflow_ids.tousertensor()
def layer_in_degree(self, layer_id): def layer_in_degree(self, layer_id):
"""Return the in-degree of the nodes in the specified layer. """Return the in-degree of the nodes in the specified layer.
...@@ -677,7 +710,7 @@ class NodeFlow(DGLBaseGraph): ...@@ -677,7 +710,7 @@ class NodeFlow(DGLBaseGraph):
if is_all(flow_range): if is_all(flow_range):
flow_range = range(0, self.num_blocks) flow_range = range(0, self.num_blocks)
elif isinstance(flow_range, slice): elif isinstance(flow_range, slice):
if slice.step is not 1: if slice.step != 1:
raise DGLError("We can't propogate flows and skip some of them") raise DGLError("We can't propogate flows and skip some of them")
flow_range = range(flow_range.start, flow_range.stop) flow_range = range(flow_range.start, flow_range.stop)
else: else:
...@@ -708,26 +741,20 @@ class NodeFlow(DGLBaseGraph): ...@@ -708,26 +741,20 @@ class NodeFlow(DGLBaseGraph):
self.block_compute(i, message_func, reduce_func, apply_node_func, self.block_compute(i, message_func, reduce_func, apply_node_func,
inplace=inplace) inplace=inplace)
def _copy_to_like(arr1, arr2):
return F.copy_to(arr1, F.context(arr2))
def create_full_node_flow(g, num_layers, add_self_loop=False): def _get_frame(frame, names, ids):
"""Convert a full graph to NodeFlow to run a L-layer GNN model. col_dict = {name: frame[name][_copy_to_like(ids, frame[name])] for name in names}
if len(col_dict) == 0:
return FrameRef(Frame(num_rows=len(ids)))
else:
return FrameRef(Frame(col_dict))
Parameters
----------
g : DGLGraph
a DGL graph
num_layers : int
The number of layers
add_self_loop : bool, default False
Whether to add self loop to the sampled NodeFlow.
If True, the edge IDs of the self loop edges are -1.
Returns def _update_frame(frame, names, ids, new_frame):
------- col_dict = {name: new_frame[name] for name in names}
NodeFlow if len(col_dict) > 0:
a NodeFlow with a specified number of layers. frame.update_rows(ids, FrameRef(Frame(col_dict)), inplace=True)
"""
seeds = [utils.toindex(F.arange(0, g.number_of_nodes()))] _init_api("dgl.nodeflow", __name__)
nfi = g._graph.neighbor_sampling(seeds, g.number_of_nodes(), num_layers,
'in', None, add_self_loop)
return NodeFlow(g, nfi[0])
"""Utility module.""" """Utility module."""
from __future__ import absolute_import, division from __future__ import absolute_import, division
import ctypes
from collections.abc import Mapping, Iterable from collections.abc import Mapping, Iterable
from functools import wraps from functools import wraps
import numpy as np import numpy as np
from . import _api_internal
from .base import DGLError from .base import DGLError
from . import backend as F from . import backend as F
from . import ndarray as nd from . import ndarray as nd
...@@ -483,3 +485,27 @@ def get_ndata_name(g, name): ...@@ -483,3 +485,27 @@ def get_ndata_name(g, name):
while name in g.ndata: while name in g.ndata:
name += '_' name += '_'
return name return name
def unwrap_to_ptr_list(wrapper):
"""Convert the internal vector wrapper to a python list of ctypes.c_void_p.
The wrapper will be destroyed after this function.
Parameters
----------
wrapper : ctypes.c_void_p
The handler to the wrapper.
Returns
-------
list of ctypes.c_void_p
A python list of void pointers.
"""
size = _api_internal._GetVectorWrapperSize(wrapper)
if size == 0:
return []
data = _api_internal._GetVectorWrapperData(wrapper)
data = ctypes.cast(data, ctypes.POINTER(ctypes.c_void_p * size))
rst = [ctypes.c_void_p(x) for x in data.contents]
_api_internal._FreeVectorWrapper(wrapper)
return rst
#!/usr/bin/env python #!/usr/bin/env python
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
import sys, os, platform import sys, os, platform, sysconfig
import shutil import shutil
import glob import glob
from setuptools import find_packages from setuptools import find_packages
from setuptools.dist import Distribution from setuptools.dist import Distribution
from setuptools import setup
# need to use distutils.core for correct placement of cython dll
if '--inplace' in sys.argv:
from distutils.core import setup
from distutils.extension import Extension
else:
from setuptools import setup
from setuptools.extension import Extension
class BinaryDistribution(Distribution): class BinaryDistribution(Distribution):
def has_ext_modules(self): def has_ext_modules(self):
...@@ -30,6 +37,49 @@ def get_lib_path(): ...@@ -30,6 +37,49 @@ def get_lib_path():
LIBS, VERSION = get_lib_path() LIBS, VERSION = get_lib_path()
def config_cython():
"""Try to configure cython and return cython configuration"""
if os.name == 'nt':
print("WARNING: Cython is not supported on Windows, will compile without cython module")
return []
sys_cflags = sysconfig.get_config_var("CFLAGS")
if "i386" in sys_cflags and "x86_64" in sys_cflags:
print("WARNING: Cython library may not be compiled correctly with both i386 and x64")
return []
try:
from Cython.Build import cythonize
# from setuptools.extension import Extension
if sys.version_info >= (3, 0):
subdir = "_cy3"
else:
subdir = "_cy2"
ret = []
path = "dgl/_ffi/_cython"
if os.name == 'nt':
library_dirs = ['dgl', '../build/Release', '../build']
libraries = ['libtvm']
else:
library_dirs = None
libraries = None
for fn in os.listdir(path):
if not fn.endswith(".pyx"):
continue
ret.append(Extension(
"dgl._ffi.%s.%s" % (subdir, fn[:-4]),
["dgl/_ffi/_cython/%s" % fn],
include_dirs=["../include/",
"../third_party/dmlc-core/include",
"../third_party/dlpack/include",
],
library_dirs=library_dirs,
libraries=libraries,
language="c++"))
return cythonize(ret)
except ImportError:
print("WARNING: Cython is not installed, will compile without cython module")
return []
include_libs = False include_libs = False
wheel_include_libs = False wheel_include_libs = False
if "bdist_wheel" in sys.argv or os.getenv('CONDA_BUILD'): if "bdist_wheel" in sys.argv or os.getenv('CONDA_BUILD'):
...@@ -74,6 +124,7 @@ setup( ...@@ -74,6 +124,7 @@ setup(
], ],
url='https://github.com/dmlc/dgl', url='https://github.com/dmlc/dgl',
distclass=BinaryDistribution, distclass=BinaryDistribution,
ext_modules=config_cython(),
classifiers=[ classifiers=[
'Development Status :: 3 - Alpha', 'Development Status :: 3 - Alpha',
'Programming Language :: Python :: 3', 'Programming Language :: Python :: 3',
......
...@@ -34,5 +34,25 @@ PackedFunc ConvertNDArrayVectorToPackedFunc(const std::vector<NDArray>& vec) { ...@@ -34,5 +34,25 @@ PackedFunc ConvertNDArrayVectorToPackedFunc(const std::vector<NDArray>& vec) {
return PackedFunc(body); return PackedFunc(body);
} }
} // namespace dgl DGL_REGISTER_GLOBAL("_GetVectorWrapperSize")
.set_body([] (DGLArgs args, DGLRetValue* rv) {
void* ptr = args[0];
const CAPIVectorWrapper* wrapper = static_cast<const CAPIVectorWrapper*>(ptr);
*rv = static_cast<int64_t>(wrapper->pointers.size());
});
DGL_REGISTER_GLOBAL("_GetVectorWrapperData")
.set_body([] (DGLArgs args, DGLRetValue* rv) {
void* ptr = args[0];
CAPIVectorWrapper* wrapper = static_cast<CAPIVectorWrapper*>(ptr);
*rv = static_cast<void*>(wrapper->pointers.data());
});
DGL_REGISTER_GLOBAL("_FreeVectorWrapper")
.set_body([] (DGLArgs args, DGLRetValue* rv) {
void* ptr = args[0];
CAPIVectorWrapper* wrapper = static_cast<CAPIVectorWrapper*>(ptr);
delete wrapper;
});
} // namespace dgl
...@@ -52,6 +52,29 @@ dgl::runtime::NDArray CopyVectorToNDArray( ...@@ -52,6 +52,29 @@ dgl::runtime::NDArray CopyVectorToNDArray(
return a; return a;
} }
/* A structure used to return a vector of void* pointers. */
struct CAPIVectorWrapper {
// The pointer vector.
std::vector<void*> pointers;
};
/*!
* \brief A helper function used to return vector of pointers from C to frontend.
*
* Note that the function will move the given vector memory into the returned
* wrapper object.
*
* \param vec The given pointer vectors.
* \return A wrapper object containing the given data.
*/
template<typename PType>
CAPIVectorWrapper* WrapVectorReturn(std::vector<PType*> vec) {
CAPIVectorWrapper* wrapper = new CAPIVectorWrapper;
wrapper->pointers.reserve(vec.size());
wrapper->pointers.insert(wrapper->pointers.end(), vec.begin(), vec.end());
return wrapper;
}
} // namespace dgl } // namespace dgl
#endif // DGL_C_API_COMMON_H_ #endif // DGL_C_API_COMMON_H_
...@@ -68,30 +68,6 @@ PackedFunc ConvertSubgraphToPackedFunc(const Subgraph& sg) { ...@@ -68,30 +68,6 @@ PackedFunc ConvertSubgraphToPackedFunc(const Subgraph& sg) {
return PackedFunc(body); return PackedFunc(body);
} }
// Convert Sampled Subgraph structures to PackedFunc.
PackedFunc ConvertSubgraphToPackedFunc(const std::vector<NodeFlow>& sg) {
auto body = [sg] (DGLArgs args, DGLRetValue* rv) {
const uint64_t which = args[0];
if (which < sg.size()) {
GraphInterface* gptr = sg[which].graph->Reset();
GraphHandle ghandle = gptr;
*rv = ghandle;
} else if (which >= sg.size() && which < sg.size() * 2) {
*rv = std::move(sg[which - sg.size()].node_mapping);
} else if (which >= sg.size() * 2 && which < sg.size() * 3) {
*rv = std::move(sg[which - sg.size() * 2].edge_mapping);
} else if (which >= sg.size() * 3 && which < sg.size() * 4) {
*rv = std::move(sg[which - sg.size() * 3].layer_offsets);
} else if (which >= sg.size() * 4 && which < sg.size() * 5) {
*rv = std::move(sg[which - sg.size() * 4].flow_offsets);
} else {
LOG(FATAL) << "invalid choice";
}
};
// TODO(minjie): figure out a better way of returning a complex results.
return PackedFunc(body);
}
} // namespace } // namespace
///////////////////////////// Graph API /////////////////////////////////// ///////////////////////////// Graph API ///////////////////////////////////
...@@ -433,89 +409,6 @@ DGL_REGISTER_GLOBAL("graph_index._CAPI_DGLGraphLineGraph") ...@@ -433,89 +409,6 @@ DGL_REGISTER_GLOBAL("graph_index._CAPI_DGLGraphLineGraph")
*rv = lghandle; *rv = lghandle;
}); });
template<int num_seeds>
void CAPI_NeighborUniformSample(DGLArgs args, DGLRetValue* rv) {
GraphHandle ghandle = args[0];
std::vector<IdArray> seeds(num_seeds);
for (size_t i = 0; i < seeds.size(); i++)
seeds[i] = IdArray::FromDLPack(CreateTmpDLManagedTensor(args[i + 1]));
std::string neigh_type = args[num_seeds + 1];
const int num_hops = args[num_seeds + 2];
const int num_neighbors = args[num_seeds + 3];
const int num_valid_seeds = args[num_seeds + 4];
const bool add_self_loop = args[num_seeds + 5];
const GraphInterface *ptr = static_cast<const GraphInterface *>(ghandle);
const ImmutableGraph *gptr = dynamic_cast<const ImmutableGraph*>(ptr);
CHECK(gptr) << "sampling isn't implemented in mutable graph";
CHECK(num_valid_seeds <= num_seeds);
std::vector<NodeFlow> subgs(seeds.size());
#pragma omp parallel for
for (int i = 0; i < num_valid_seeds; i++) {
subgs[i] = SamplerOp::NeighborUniformSample(gptr, seeds[i], neigh_type, num_hops,
num_neighbors, add_self_loop);
}
*rv = ConvertSubgraphToPackedFunc(subgs);
}
DGL_REGISTER_GLOBAL("graph_index._CAPI_DGLGraphUniformSampling")
.set_body(CAPI_NeighborUniformSample<1>);
DGL_REGISTER_GLOBAL("graph_index._CAPI_DGLGraphUniformSampling2")
.set_body(CAPI_NeighborUniformSample<2>);
DGL_REGISTER_GLOBAL("graph_index._CAPI_DGLGraphUniformSampling4")
.set_body(CAPI_NeighborUniformSample<4>);
DGL_REGISTER_GLOBAL("graph_index._CAPI_DGLGraphUniformSampling8")
.set_body(CAPI_NeighborUniformSample<8>);
DGL_REGISTER_GLOBAL("graph_index._CAPI_DGLGraphUniformSampling16")
.set_body(CAPI_NeighborUniformSample<16>);
DGL_REGISTER_GLOBAL("graph_index._CAPI_DGLGraphUniformSampling32")
.set_body(CAPI_NeighborUniformSample<32>);
DGL_REGISTER_GLOBAL("graph_index._CAPI_DGLGraphUniformSampling64")
.set_body(CAPI_NeighborUniformSample<64>);
DGL_REGISTER_GLOBAL("graph_index._CAPI_DGLGraphUniformSampling128")
.set_body(CAPI_NeighborUniformSample<128>);
template<int num_seeds>
void CAPI_LayerUniformSample(DGLArgs args, DGLRetValue* rv) {
GraphHandle ghandle = args[0];
std::vector<IdArray> seeds(num_seeds);
for (size_t i = 0; i < seeds.size(); i++)
seeds[i] = IdArray::FromDLPack(CreateTmpDLManagedTensor(args[i + 1]));
std::string neigh_type = args[num_seeds + 1];
auto ls_array = IdArray::FromDLPack(CreateTmpDLManagedTensor(args[num_seeds + 2]));
size_t *ls_data = static_cast<size_t*>(ls_array->data);
size_t ls_len = ls_array->shape[0];
std::vector<size_t> layer_sizes;
std::copy(ls_data, ls_data + ls_len, std::back_inserter(layer_sizes));
const int num_valid_seeds = args[num_seeds + 3];
const GraphInterface *ptr = static_cast<const GraphInterface *>(ghandle);
const ImmutableGraph *gptr = dynamic_cast<const ImmutableGraph*>(ptr);
CHECK(gptr) << "sampling isn't implemented in mutable graph";
CHECK(num_valid_seeds <= num_seeds);
std::vector<NodeFlow> subgs(seeds.size());
#pragma omp parallel for
for (int i = 0; i < num_valid_seeds; i++) {
subgs[i] = SamplerOp::LayerUniformSample(gptr, seeds[i], neigh_type, layer_sizes);
}
*rv = ConvertSubgraphToPackedFunc(subgs);
}
DGL_REGISTER_GLOBAL("graph_index._CAPI_DGLGraphLayerUniformSampling")
.set_body(CAPI_LayerUniformSample<1>);
DGL_REGISTER_GLOBAL("graph_index._CAPI_DGLGraphLayerUniformSampling2")
.set_body(CAPI_LayerUniformSample<2>);
DGL_REGISTER_GLOBAL("graph_index._CAPI_DGLGraphLayerUniformSampling4")
.set_body(CAPI_LayerUniformSample<4>);
DGL_REGISTER_GLOBAL("graph_index._CAPI_DGLGraphLayerUniformSampling8")
.set_body(CAPI_LayerUniformSample<8>);
DGL_REGISTER_GLOBAL("graph_index._CAPI_DGLGraphLayerUniformSampling16")
.set_body(CAPI_LayerUniformSample<16>);
DGL_REGISTER_GLOBAL("graph_index._CAPI_DGLGraphLayerUniformSampling32")
.set_body(CAPI_LayerUniformSample<32>);
DGL_REGISTER_GLOBAL("graph_index._CAPI_DGLGraphLayerUniformSampling64")
.set_body(CAPI_LayerUniformSample<64>);
DGL_REGISTER_GLOBAL("graph_index._CAPI_DGLGraphLayerUniformSampling128")
.set_body(CAPI_LayerUniformSample<128>);
DGL_REGISTER_GLOBAL("graph_index._CAPI_DGLGraphGetAdj") DGL_REGISTER_GLOBAL("graph_index._CAPI_DGLGraphGetAdj")
.set_body([] (DGLArgs args, DGLRetValue* rv) { .set_body([] (DGLArgs args, DGLRetValue* rv) {
GraphHandle ghandle = args[0]; GraphHandle ghandle = args[0];
......
...@@ -11,6 +11,7 @@ ...@@ -11,6 +11,7 @@
#include <cstdlib> #include <cstdlib>
#include <cmath> #include <cmath>
#include <numeric> #include <numeric>
#include "../c_api_common.h"
#ifdef _MSC_VER #ifdef _MSC_VER
// rand in MS compiler works well in multi-threading. // rand in MS compiler works well in multi-threading.
...@@ -19,10 +20,15 @@ int rand_r(unsigned *seed) { ...@@ -19,10 +20,15 @@ int rand_r(unsigned *seed) {
} }
#endif #endif
using dgl::runtime::DGLArgs;
using dgl::runtime::DGLArgValue;
using dgl::runtime::DGLRetValue;
using dgl::runtime::PackedFunc;
using dgl::runtime::NDArray;
namespace dgl { namespace dgl {
namespace { namespace {
/* /*
* ArrayHeap is used to sample elements from vector * ArrayHeap is used to sample elements from vector
*/ */
...@@ -373,29 +379,28 @@ NodeFlow ConstructNodeFlow(std::vector<dgl_id_t> neighbor_list, ...@@ -373,29 +379,28 @@ NodeFlow ConstructNodeFlow(std::vector<dgl_id_t> neighbor_list,
} }
NodeFlow SampleSubgraph(const ImmutableGraph *graph, NodeFlow SampleSubgraph(const ImmutableGraph *graph,
IdArray seed_arr, const std::vector<dgl_id_t>& seeds,
const float* probability, const float* probability,
const std::string &edge_type, const std::string &edge_type,
int num_hops, int num_hops,
size_t num_neighbor, size_t num_neighbor,
const bool add_self_loop) { const bool add_self_loop) {
unsigned int time_seed = time(nullptr); unsigned int time_seed = time(nullptr);
size_t num_seeds = seed_arr->shape[0]; const size_t num_seeds = seeds.size();
auto orig_csr = edge_type == "in" ? graph->GetInCSR() : graph->GetOutCSR(); auto orig_csr = edge_type == "in" ? graph->GetInCSR() : graph->GetOutCSR();
const dgl_id_t* val_list = orig_csr->edge_ids.data(); const dgl_id_t* val_list = orig_csr->edge_ids.data();
const dgl_id_t* col_list = orig_csr->indices.data(); const dgl_id_t* col_list = orig_csr->indices.data();
const int64_t* indptr = orig_csr->indptr.data(); const int64_t* indptr = orig_csr->indptr.data();
const dgl_id_t* seed = static_cast<dgl_id_t*>(seed_arr->data);
std::unordered_set<dgl_id_t> sub_ver_map; // The vertex Ids in a layer. std::unordered_set<dgl_id_t> sub_ver_map; // The vertex Ids in a layer.
std::vector<std::pair<dgl_id_t, int> > sub_vers; std::vector<std::pair<dgl_id_t, int> > sub_vers;
sub_vers.reserve(num_seeds * 10); sub_vers.reserve(num_seeds * 10);
// add seed vertices // add seed vertices
for (size_t i = 0; i < num_seeds; ++i) { for (size_t i = 0; i < num_seeds; ++i) {
auto ret = sub_ver_map.insert(seed[i]); auto ret = sub_ver_map.insert(seeds[i]);
// If the vertex is inserted successfully. // If the vertex is inserted successfully.
if (ret.second) { if (ret.second) {
sub_vers.emplace_back(seed[i], 0); sub_vers.emplace_back(seeds[i], 0);
} }
} }
std::vector<dgl_id_t> tmp_sampled_src_list; std::vector<dgl_id_t> tmp_sampled_src_list;
...@@ -478,7 +483,51 @@ NodeFlow SampleSubgraph(const ImmutableGraph *graph, ...@@ -478,7 +483,51 @@ NodeFlow SampleSubgraph(const ImmutableGraph *graph,
} // namespace } // namespace
NodeFlow SamplerOp::NeighborUniformSample(const ImmutableGraph *graph, IdArray seeds, DGL_REGISTER_GLOBAL("nodeflow._CAPI_NodeFlowGetGraph")
.set_body([] (DGLArgs args, DGLRetValue* rv) {
void* ptr = args[0];
const NodeFlow* nflow = static_cast<NodeFlow*>(ptr);
GraphInterface* gptr = nflow->graph->Reset();
*rv = gptr;
});
DGL_REGISTER_GLOBAL("nodeflow._CAPI_NodeFlowGetNodeMapping")
.set_body([] (DGLArgs args, DGLRetValue* rv) {
void* ptr = args[0];
const NodeFlow* nflow = static_cast<NodeFlow*>(ptr);
*rv = nflow->node_mapping;
});
DGL_REGISTER_GLOBAL("nodeflow._CAPI_NodeFlowGetEdgeMapping")
.set_body([] (DGLArgs args, DGLRetValue* rv) {
void* ptr = args[0];
const NodeFlow* nflow = static_cast<NodeFlow*>(ptr);
*rv = nflow->edge_mapping;
});
DGL_REGISTER_GLOBAL("nodeflow._CAPI_NodeFlowGetLayerOffsets")
.set_body([] (DGLArgs args, DGLRetValue* rv) {
void* ptr = args[0];
const NodeFlow* nflow = static_cast<NodeFlow*>(ptr);
*rv = nflow->layer_offsets;
});
DGL_REGISTER_GLOBAL("nodeflow._CAPI_NodeFlowGetBlockOffsets")
.set_body([] (DGLArgs args, DGLRetValue* rv) {
void* ptr = args[0];
const NodeFlow* nflow = static_cast<NodeFlow*>(ptr);
*rv = nflow->flow_offsets;
});
DGL_REGISTER_GLOBAL("nodeflow._CAPI_NodeFlowFree")
.set_body([] (DGLArgs args, DGLRetValue* rv) {
void* ptr = args[0];
NodeFlow* nflow = static_cast<NodeFlow*>(ptr);
delete nflow;
});
NodeFlow SamplerOp::NeighborUniformSample(const ImmutableGraph *graph,
const std::vector<dgl_id_t>& seeds,
const std::string &edge_type, const std::string &edge_type,
int num_hops, int expand_factor, int num_hops, int expand_factor,
const bool add_self_loop) { const bool add_self_loop) {
...@@ -535,8 +584,8 @@ IdArray SamplerOp::RandomWalk( ...@@ -535,8 +584,8 @@ IdArray SamplerOp::RandomWalk(
namespace { namespace {
void ConstructLayers(const int64_t *indptr, void ConstructLayers(const int64_t *indptr,
const dgl_id_t *indices, const dgl_id_t *indices,
const IdArray seed_array, const std::vector<dgl_id_t>& seed_array,
const std::vector<size_t> &layer_sizes, IdArray layer_sizes,
std::vector<dgl_id_t> *layer_offsets, std::vector<dgl_id_t> *layer_offsets,
std::vector<dgl_id_t> *node_mapping, std::vector<dgl_id_t> *node_mapping,
std::vector<int64_t> *actl_layer_sizes, std::vector<int64_t> *actl_layer_sizes,
...@@ -546,17 +595,17 @@ namespace { ...@@ -546,17 +595,17 @@ namespace {
* layers via uniform layer-wise sampling, and return the resultant layers and their * layers via uniform layer-wise sampling, and return the resultant layers and their
* corresponding probabilities. * corresponding probabilities.
*/ */
const dgl_id_t* seed_data = static_cast<dgl_id_t*>(seed_array->data); std::copy(seed_array.begin(), seed_array.end(), std::back_inserter(*node_mapping));
size_t seed_len = seed_array->shape[0];
std::copy(seed_data, seed_data + seed_len, std::back_inserter(*node_mapping));
actl_layer_sizes->push_back(node_mapping->size()); actl_layer_sizes->push_back(node_mapping->size());
probabilities->insert(probabilities->end(), node_mapping->size(), 1); probabilities->insert(probabilities->end(), node_mapping->size(), 1);
const int64_t* layer_sizes_data = static_cast<int64_t*>(layer_sizes->data);
const int64_t num_layers = layer_sizes->shape[0];
size_t curr = 0; size_t curr = 0;
size_t next = node_mapping->size(); size_t next = node_mapping->size();
unsigned int rand_seed = time(nullptr); unsigned int rand_seed = time(nullptr);
for (auto i = layer_sizes.rbegin(); i != layer_sizes.rend(); ++i) { for (int64_t i = num_layers - 1; i >= 0; --i) {
auto layer_size = *i; const int64_t layer_size = layer_sizes_data[i];
std::unordered_set<dgl_id_t> candidate_set; std::unordered_set<dgl_id_t> candidate_set;
for (auto j = curr; j != next; ++j) { for (auto j = curr; j != next; ++j) {
auto src = (*node_mapping)[j]; auto src = (*node_mapping)[j];
...@@ -569,7 +618,7 @@ namespace { ...@@ -569,7 +618,7 @@ namespace {
std::unordered_map<dgl_id_t, size_t> n_occurrences; std::unordered_map<dgl_id_t, size_t> n_occurrences;
auto n_candidates = candidate_vector.size(); auto n_candidates = candidate_vector.size();
for (size_t j = 0; j != layer_size; ++j) { for (int64_t j = 0; j != layer_size; ++j) {
auto dst = candidate_vector[rand_r(&rand_seed) % n_candidates]; auto dst = candidate_vector[rand_r(&rand_seed) % n_candidates];
if (!n_occurrences.insert(std::make_pair(dst, 1)).second) { if (!n_occurrences.insert(std::make_pair(dst, 1)).second) {
++n_occurrences[dst]; ++n_occurrences[dst];
...@@ -647,9 +696,9 @@ namespace { ...@@ -647,9 +696,9 @@ namespace {
} // namespace } // namespace
NodeFlow SamplerOp::LayerUniformSample(const ImmutableGraph *graph, NodeFlow SamplerOp::LayerUniformSample(const ImmutableGraph *graph,
const IdArray seed_array, const std::vector<dgl_id_t>& seeds,
const std::string &neighbor_type, const std::string &neighbor_type,
const std::vector<size_t> &layer_sizes) { IdArray layer_sizes) {
const auto g_csr = neighbor_type == "in" ? graph->GetInCSR() : graph->GetOutCSR(); const auto g_csr = neighbor_type == "in" ? graph->GetInCSR() : graph->GetOutCSR();
const int64_t *indptr = g_csr->indptr.data(); const int64_t *indptr = g_csr->indptr.data();
const dgl_id_t *indices = g_csr->indices.data(); const dgl_id_t *indices = g_csr->indices.data();
...@@ -661,7 +710,7 @@ NodeFlow SamplerOp::LayerUniformSample(const ImmutableGraph *graph, ...@@ -661,7 +710,7 @@ NodeFlow SamplerOp::LayerUniformSample(const ImmutableGraph *graph,
std::vector<float> probabilities; std::vector<float> probabilities;
ConstructLayers(indptr, ConstructLayers(indptr,
indices, indices,
seed_array, seeds,
layer_sizes, layer_sizes,
&layer_offsets, &layer_offsets,
&node_mapping, &node_mapping,
...@@ -715,4 +764,82 @@ NodeFlow SamplerOp::LayerUniformSample(const ImmutableGraph *graph, ...@@ -715,4 +764,82 @@ NodeFlow SamplerOp::LayerUniformSample(const ImmutableGraph *graph,
return nf; return nf;
} }
DGL_REGISTER_GLOBAL("sampling._CAPI_UniformSampling")
.set_body([] (DGLArgs args, DGLRetValue* rv) {
// arguments
const GraphHandle ghdl = args[0];
const IdArray seed_nodes = IdArray::FromDLPack(CreateTmpDLManagedTensor(args[1]));
const int64_t batch_start_id = args[2];
const int64_t batch_size = args[3];
const int64_t max_num_workers = args[4];
const int64_t expand_factor = args[5];
const int64_t num_hops = args[6];
const std::string neigh_type = args[7];
const bool add_self_loop = args[8];
// process args
const GraphInterface *ptr = static_cast<const GraphInterface *>(ghdl);
const ImmutableGraph *gptr = dynamic_cast<const ImmutableGraph*>(ptr);
CHECK(gptr) << "sampling isn't implemented in mutable graph";
CHECK(IsValidIdArray(seed_nodes));
const dgl_id_t* seed_nodes_data = static_cast<dgl_id_t*>(seed_nodes->data);
const int64_t num_seeds = seed_nodes->shape[0];
const int64_t num_workers = std::min(max_num_workers,
(num_seeds + batch_size - 1) / batch_size - batch_start_id);
// generate node flows
std::vector<NodeFlow*> nflows(num_workers);
#pragma omp parallel for
for (int i = 0; i < num_workers; i++) {
// create per-worker seed nodes.
const int64_t start = (batch_start_id + i) * batch_size;
const int64_t end = std::min(start + batch_size, num_seeds);
// TODO(minjie): the vector allocation/copy is unnecessary
std::vector<dgl_id_t> worker_seeds(end - start);
std::copy(seed_nodes_data + start, seed_nodes_data + end,
worker_seeds.begin());
nflows[i] = new NodeFlow();
*nflows[i] = SamplerOp::NeighborUniformSample(
gptr, worker_seeds, neigh_type, num_hops, expand_factor, add_self_loop);
}
*rv = WrapVectorReturn(nflows);
});
DGL_REGISTER_GLOBAL("sampling._CAPI_LayerSampling")
.set_body([] (DGLArgs args, DGLRetValue* rv) {
// arguments
const GraphHandle ghdl = args[0];
const IdArray seed_nodes = IdArray::FromDLPack(CreateTmpDLManagedTensor(args[1]));
const int64_t batch_start_id = args[2];
const int64_t batch_size = args[3];
const int64_t max_num_workers = args[4];
const IdArray layer_sizes = IdArray::FromDLPack(CreateTmpDLManagedTensor(args[5]));
const std::string neigh_type = args[6];
// process args
const GraphInterface *ptr = static_cast<const GraphInterface *>(ghdl);
const ImmutableGraph *gptr = dynamic_cast<const ImmutableGraph*>(ptr);
CHECK(gptr) << "sampling isn't implemented in mutable graph";
CHECK(IsValidIdArray(seed_nodes));
const dgl_id_t* seed_nodes_data = static_cast<dgl_id_t*>(seed_nodes->data);
const int64_t num_seeds = seed_nodes->shape[0];
const int64_t num_workers = std::min(max_num_workers,
(num_seeds + batch_size - 1) / batch_size - batch_start_id);
// generate node flows
std::vector<NodeFlow*> nflows(num_workers);
#pragma omp parallel for
for (int i = 0; i < num_workers; i++) {
// create per-worker seed nodes.
const int64_t start = (batch_start_id + i) * batch_size;
const int64_t end = std::min(start + batch_size, num_seeds);
// TODO(minjie): the vector allocation/copy is unnecessary
std::vector<dgl_id_t> worker_seeds(end - start);
std::copy(seed_nodes_data + start, seed_nodes_data + end,
worker_seeds.begin());
nflows[i] = new NodeFlow();
*nflows[i] = SamplerOp::LayerUniformSample(
gptr, worker_seeds, neigh_type, layer_sizes);
}
*rv = WrapVectorReturn(nflows);
});
} // namespace dgl } // namespace dgl
...@@ -14,6 +14,7 @@ ...@@ -14,6 +14,7 @@
#define DGL_RUNTIME_PACK_ARGS_H_ #define DGL_RUNTIME_PACK_ARGS_H_
#include <dgl/runtime/c_runtime_api.h> #include <dgl/runtime/c_runtime_api.h>
#include <dgl/runtime/packed_func.h>
#include <vector> #include <vector>
#include <cstring> #include <cstring>
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment