Commit 750e5037 authored by Lingfan Yu's avatar Lingfan Yu Committed by Minjie Wang
Browse files

Efficient send_and_recv (#83)

* python side send_and_recv shortcut

* common util function for c apis

* degree bucketing scheduler

* scheduler c apis

* cmake

* python side logics

* minor

* fix multiple msg/red func cases

* remove shell echo command in Jenkinsfile...

* fuse apply for send_and_recv

* fuse update func for update_all

* typo...

* handle anonymous

* add degree bucketing for entire graph

* degree bucketing executor

* minor

* style

* rename graph_mapping to recv_nodes

* fix some comments
parent b2c1c4fa
......@@ -86,9 +86,7 @@ endif(MSVC)
#assign_source_group("Include" ${GROUP_INCLUDE})
# Source file lists
file(GLOB CORE_SRCS
src/graph/*.cc
)
file(GLOB CORE_SRCS src/graph/*.cc src/*.cc src/scheduler/*.cc)
file(GLOB RUNTIME_SRCS src/runtime/*.cc)
......
......@@ -33,7 +33,6 @@ pipeline {
stage('TEST') {
steps {
withEnv(["DGL_LIBRARY_PATH=${env.WORKSPACE}/build"]) {
sh 'echo $DGL_LIBRARY_PATH'
sh 'nosetests tests -v --with-xunit'
sh 'nosetests tests/pytorch -v --with-xunit'
}
......@@ -76,7 +75,6 @@ pipeline {
stage('TEST') {
steps {
withEnv(["DGL_LIBRARY_PATH=${env.WORKSPACE}/build"]) {
sh 'echo $DGL_LIBRARY_PATH'
sh 'nosetests tests -v --with-xunit'
sh 'nosetests tests/pytorch -v --with-xunit'
}
......
// DGL Scheduler interface
#ifndef DGL_SCHEDULER_H_
#define DGL_SCHEDULER_H_
#include "runtime/ndarray.h"
#include <vector>
namespace dgl {
typedef tvm::runtime::NDArray IdArray;
namespace sched {
/*!
* \brief Generate degree bucketing schedule
* \param vids The destination vertex for messages
* \note If there are multiple messages going into the same destination vertex, then
* there will be multiple copies of the destination vertex in vids
* \return a vector of 5 IdArrays for degree bucketing. The 5 arrays are:
* degrees: of degrees for each bucket
* nids: destination node ids
* nid_section: number of nodes in each bucket (used to split nids)
* mids: message ids
* mid_section: number of messages in each bucket (used to split mids)
*/
std::vector<IdArray> DegreeBucketing(const IdArray& vids);
} // namespace sched
} // namespace dgl
#endif // DGL_SCHEDULER_H_
......@@ -779,12 +779,34 @@ class DGLGraph(object):
apply_node_func : callable
The apply node function.
"""
self._apply_nodes(v, apply_node_func)
def _apply_nodes(self, v, apply_node_func="default", reduce_accum=None):
"""Internal apply nodes
Parameters
----------
reduce_accum: dict-like
The output of reduce func
"""
if apply_node_func == "default":
apply_node_func = self._apply_node_func
if not apply_node_func:
# Skip none function call.
if reduce_accum is not None:
# write reduce result back
self.set_n_repr(reduce_accum, v)
return
new_repr = apply_node_func(self.get_n_repr(v))
# take out current node repr
curr_repr = self.get_n_repr(v)
if reduce_accum is not None:
# merge current node_repr with reduce output
curr_repr = utils.HybridDict(reduce_accum, curr_repr)
new_repr = apply_node_func(curr_repr)
if reduce_accum is not None and utils.is_dict_like(new_repr) :
# merge new node_repr with reduce output
reduce_accum.update(new_repr)
new_repr = reduce_accum
self.set_n_repr(new_repr, v)
def apply_edges(self, u, v, apply_edge_func="default"):
......@@ -1039,7 +1061,6 @@ class DGLGraph(object):
# no edges to be triggered
assert len(v) == 0
return
unique_v = utils.toindex(F.unique(v.tousertensor()))
if message_func == "default":
message_func = self._message_func
......@@ -1052,11 +1073,37 @@ class DGLGraph(object):
'send_and_recv', self, src=u, dst=v,
message_func=message_func, reduce_func=reduce_func)
if executor:
executor.run()
new_reprs = executor.run()
if not utils.is_dict_like(new_reprs):
new_reprs = {__REPR__: new_reprs}
unique_v = executor.recv_nodes
else:
self.send(u, v, message_func)
self.recv(unique_v, reduce_func, None)
self.apply_nodes(unique_v, apply_node_func)
# handle multiple message and reduce func
if isinstance(message_func, (tuple, list)):
message_func = BundledMessageFunction(message_func)
if isinstance(reduce_func, (list, tuple)):
reduce_func = BundledReduceFunction(reduce_func)
# message func
u, v = utils.edge_broadcasting(u, v)
src_reprs = self.get_n_repr(u)
edge_reprs = self.get_e_repr(u, v)
msgs = message_func(src_reprs, edge_reprs)
msg_frame = FrameRef()
if utils.is_dict_like(msgs):
msg_frame.append(msgs)
else:
msg_frame.append({__MSG__: msgs})
# recv with degree bucketing
executor = scheduler.get_recv_executor(graph=self,
reduce_func=reduce_func,
message_frame=msg_frame,
edges=(u, v))
new_reprs = executor.run()
unique_v = executor.recv_nodes
self._apply_nodes(unique_v, apply_node_func, reduce_accum=new_reprs)
def pull(self,
v,
......@@ -1134,11 +1181,13 @@ class DGLGraph(object):
executor = scheduler.get_executor(
"update_all", self, message_func=message_func, reduce_func=reduce_func)
if executor:
executor.run()
new_reprs = executor.run()
if not utils.is_dict_like(new_reprs):
new_reprs = {__REPR__: new_reprs}
self._apply_nodes(ALL, apply_node_func, reduce_accum=new_reprs)
else:
self.send(ALL, ALL, message_func)
self.recv(ALL, reduce_func, None)
self.apply_nodes(ALL, apply_node_func)
self.recv(ALL, reduce_func, apply_node_func)
def propagate(self,
traverser='topo',
......
......@@ -3,13 +3,16 @@ from __future__ import absolute_import
import numpy as np
from .base import ALL
from .base import ALL, __MSG__, __REPR__
from . import backend as F
from .function import message as fmsg
from .function import reducer as fred
from . import utils
from collections import defaultdict as ddict
__all__ = ["degree_bucketing", "get_executor"]
from ._ffi.function import _init_api
__all__ = ["degree_bucketing", "get_recv_executor", "get_executor"]
def degree_bucketing(graph, v):
"""Create degree bucketing scheduling policy.
......@@ -39,6 +42,73 @@ def degree_bucketing(graph, v):
#print('degree-bucketing:', unique_degrees, [len(b) for b in v_bkt])
return unique_degrees, v_bkt
def _process_buckets(buckets):
"""read bucketing auxiliary data"""
# get back results
degs = utils.toindex(buckets(0))
v = utils.toindex(buckets(1))
# TODO: convert directly from ndarary to python list?
v_section = buckets(2).asnumpy().tolist()
msg_ids = utils.toindex(buckets(3))
msg_section = buckets(4).asnumpy().tolist()
# split buckets
unique_v = v.tousertensor()
msg_ids = msg_ids.tousertensor()
dsts = F.unpack(unique_v, v_section)
msg_ids = F.unpack(msg_ids, msg_section)
# convert to utils.Index
unique_v = utils.toindex(unique_v)
dsts = [utils.toindex(dst) for dst in dsts]
msg_ids = [utils.toindex(msg_id) for msg_id in msg_ids]
return unique_v, degs, dsts, msg_ids
def light_degree_bucketing(v):
"""Return the bucketing by degree scheduling for destination nodes of messages
Parameters
----------
v: utils.Index
destionation node for each message
Returns
-------
unique_v: utils.Index
unqiue destination nodes
degrees: utils.Index
A list of degree for each bucket
v_bkt: list of utils.Index
A list of node id buckets, nodes in each bucket have the same degree
msg_ids: list of utils.Index
A list of message id buckets, each node in the ith node id bucket has
degree[i] messages in the ith message id bucket
"""
buckets = _CAPI_DGLDegreeBucketing(v.todgltensor())
return _process_buckets(buckets)
def light_degree_bucketing_for_graph(graph):
"""Return the bucketing by degree scheduling for the entire graph
Parameters:
graph: GraphIndex
Returns
-------
unique_v: utils.Index
unqiue destination nodes
degrees: utils.Index
A list of degree for each bucket
v_bkt: list of utils.Index
A list of node id buckets, nodes in each bucket have the same degree
msg_ids: list of utils.Index
A list of message id buckets, each node in the ith node id bucket has
degree[i] messages in the ith message id bucket
"""
buckets = _CAPI_DGLDegreeBucketingFromGraph(self._handle)
return _process_buckets(buckets)
class Executor(object):
def run(self):
......@@ -78,6 +148,55 @@ class SPMVOperator(Executor):
return {self.dst_field : dstcol}
# FIXME: refactorize in scheduler/executor redesign
class DegreeBucketingExecutor(Executor):
def __init__(self, g, rfunc, message_frame, edges=None):
self.g = g
self.rfunc = rfunc
self.msg_frame = message_frame
# calc degree bucketing schedule
if edges is not None:
unique_v, degs, dsts, msg_ids = light_degree_bucketing(edges[1])
else:
unique_v, degs, dsts, msg_ids = light_degree_bucketing_for_graph(g._graph)
self._recv_nodes = unique_v
self.degrees = degs
self.dsts = dsts
self.msg_ids = msg_ids
@property
def recv_nodes(self):
return self._recv_nodes
def run(self):
new_reprs = []
# loop over each bucket
# FIXME (lingfan): handle zero-degree case
for deg, vv, msg_id in zip(self.degrees, self.dsts, self.msg_ids):
dst_reprs = self.g.get_n_repr(vv)
in_msgs = self.msg_frame.select_rows(msg_id)
def _reshape_fn(msg):
msg_shape = F.shape(msg)
new_shape = (len(vv), deg) + msg_shape[1:]
return F.reshape(msg, new_shape)
if len(in_msgs) == 1 and __MSG__ in in_msgs:
reshaped_in_msgs = _reshape_fn(in_msgs[__MSG__])
else:
reshaped_in_msgs = utils.LazyDict(
lambda key: _reshape_fn(in_msgs[key]), self.msg_frame.schemes)
new_reprs.append(self.rfunc(dst_reprs, reshaped_in_msgs))
# Pack all reducer results together
if utils.is_dict_like(new_reprs[0]):
keys = new_reprs[0].keys()
new_reprs = {key : F.pack([repr[key] for repr in new_reprs])
for key in keys}
else:
new_reprs = {__REPR__ : F.pack(new_reprs)}
return new_reprs
class BasicExecutor(Executor):
def __init__(self, graph, mfunc, rfunc):
self.g = graph
......@@ -92,7 +211,7 @@ class BasicExecutor(Executor):
raise NotImplementedError
@property
def graph_mapping(self):
def recv_nodes(self):
raise NotImplementedError
def _build_exec(self, mfunc, rfunc):
......@@ -115,8 +234,7 @@ class BasicExecutor(Executor):
return exe
def run(self):
attr = self.exe.run()
self.g.set_n_repr(attr, self.graph_mapping)
return self.exe.run()
class UpdateAllExecutor(BasicExecutor):
......@@ -129,7 +247,7 @@ class UpdateAllExecutor(BasicExecutor):
self._edge_repr = None
self._graph_idx = None
self._graph_shape = None
self._graph_mapping = None
self._recv_nodes = None
@property
def graph_idx(self):
......@@ -145,7 +263,7 @@ class UpdateAllExecutor(BasicExecutor):
return self._graph_shape
@property
def graph_mapping(self):
def recv_nodes(self):
return ALL
@property
......@@ -186,7 +304,7 @@ class SendRecvExecutor(BasicExecutor):
self._edge_repr = None
self._graph_idx = None
self._graph_shape = None
self._graph_mapping = None
self._recv_nodes = None
@property
def graph_idx(self):
......@@ -201,10 +319,10 @@ class SendRecvExecutor(BasicExecutor):
return self._graph_shape
@property
def graph_mapping(self):
if self._graph_mapping is None:
def recv_nodes(self):
if self._recv_nodes is None:
self._build_adjmat()
return self._graph_mapping
return self._recv_nodes
@property
def node_repr(self):
......@@ -229,7 +347,7 @@ class SendRecvExecutor(BasicExecutor):
m = len(new2old)
self._graph_idx = F.pack([F.unsqueeze(new_v, 0), F.unsqueeze(u, 0)])
self._graph_shape = [m, n]
self._graph_mapping = new2old
self._recv_nodes = new2old
def _adj_build_fn(self, edge_field, ctx, use_edge_feat):
if use_edge_feat:
......@@ -283,7 +401,7 @@ class BundledExecutor(BasicExecutor):
else:
# attr and res must be dict
attr.update(res)
self.g.set_n_repr(attr, self.graph_mapping)
return attr
class BundledUpdateAllExecutor(BundledExecutor, UpdateAllExecutor):
......@@ -342,3 +460,24 @@ def get_executor(call_type, graph, **kwargs):
return _create_send_and_recv_exec(graph, **kwargs)
else:
return None
def get_recv_executor(graph, reduce_func, message_frame, edges=None):
"""Create executor for recv phase
Parameters
----------
graph: DGLGraph
DGLGraph on which to perform recv
reduce_func: callable
The reduce function
message_frame: FrameRef
Message frame
edges: tuple/list of utils.Index
src and dst Index representing edges along which messages are sent
If not specified, all edges of graph are used instead
"""
# FIXME: handle builtin spmv executor case
return DegreeBucketingExecutor(graph, reduce_func, message_frame, edges)
_init_api("dgl.scheduler")
......@@ -23,7 +23,7 @@ class Index(object):
if not (F.dtype(data) == F.int64 and len(F.shape(data)) == 1):
raise ValueError('Index data must be 1D int64 vector, but got: %s' % str(data))
self._user_tensor_data[F.get_context(data)] = data
elif isinstance(data, nd.NDArray):
elif isinstance(data, nd.NDArray):
if not (data.dtype == 'int64' and len(data.shape) == 1):
raise ValueError('Index data must be 1D int64 vector, but got: %s' % str(data))
self._dgl_tensor_data = data
......@@ -168,6 +168,34 @@ class LazyDict(Mapping):
def __len__(self):
return len(self._keys)
class HybridDict(Mapping):
"""A readonly dictonary that merges several dict-like (python dict, LazyDict).
If there are duplicate keys, early keys have priority over latter ones
"""
def __init__(self, *dict_like_list):
self._dict_like_list = dict_like_list
self._keys = None
def keys(self):
if self._keys is None:
self._keys = sum([set(d.keys()) for d in self._dict_like_list], set())
self._keys = list(self._keys)
return self._keys
def __getitem__(self, key):
for d in self._dict_like_list:
if key in d:
return d[key]
def __contains__(self, key):
return key in self.keys()
def __iter__(self):
return iter(self.keys())
def __len__(self):
return len(self.keys())
class ReadOnlyDict(Mapping):
"""A readonly dictionary wrapper."""
def __init__(self, dict_like):
......
#include "c_api_common.h"
using tvm::runtime::TVMArgs;
using tvm::runtime::TVMArgValue;
using tvm::runtime::TVMRetValue;
using tvm::runtime::PackedFunc;
using tvm::runtime::NDArray;
namespace dgl {
DLManagedTensor* CreateTmpDLManagedTensor(const TVMArgValue& arg) {
const DLTensor* dl_tensor = arg;
DLManagedTensor* ret = new DLManagedTensor();
ret->deleter = [] (DLManagedTensor* self) { delete self; };
ret->manager_ctx = nullptr;
ret->dl_tensor = *dl_tensor;
return ret;
}
PackedFunc ConvertNDArrayVectorToPackedFunc(const std::vector<NDArray>& vec) {
auto body = [vec](TVMArgs args, TVMRetValue* rv) {
size_t which = args[0];
if (which >= vec.size()) {
LOG(FATAL) << "invalid choice";
} else {
*rv = std::move(vec[which]);
}
};
return PackedFunc(body);
}
} // namespace dgl
// DGL C API common util functions
#ifndef DGL_C_API_COMMON_H_
#define DGL_C_API_COMMON_H_
#include <dgl/runtime/ndarray.h>
#include <dgl/runtime/packed_func.h>
#include <dgl/runtime/registry.h>
#include <vector>
namespace dgl {
// Graph handler type
typedef void* GraphHandle;
// Convert the given DLTensor to a temporary DLManagedTensor that does not own memory.
DLManagedTensor* CreateTmpDLManagedTensor(const tvm::runtime::TVMArgValue& arg);
// Convert a vector of NDArray to PackedFunc
tvm::runtime::PackedFunc ConvertNDArrayVectorToPackedFunc(const std::vector<tvm::runtime::NDArray>& vec);
} // namespace dgl
#endif // DGL_C_API_COMMON_H_
#include <dgl/runtime/packed_func.h>
#include <dgl/runtime/registry.h>
#include <dgl/graph.h>
#include <dgl/graph_op.h>
#include "../c_api_common.h"
using tvm::runtime::TVMArgs;
using tvm::runtime::TVMArgValue;
......@@ -11,9 +10,6 @@ using tvm::runtime::NDArray;
namespace dgl {
// Graph handler type
typedef void* GraphHandle;
namespace {
// Convert EdgeArray structure to PackedFunc.
PackedFunc ConvertEdgeArrayToPackedFunc(const Graph::EdgeArray& ea) {
......@@ -52,16 +48,6 @@ PackedFunc ConvertSubgraphToPackedFunc(const Subgraph& sg) {
return PackedFunc(body);
}
// Convert the given DLTensor to a temporary DLManagedTensor that does not own memory.
DLManagedTensor* CreateTmpDLManagedTensor(const TVMArgValue& arg) {
const DLTensor* dl_tensor = arg;
DLManagedTensor* ret = new DLManagedTensor();
ret->deleter = [] (DLManagedTensor* self) { delete self; };
ret->manager_ctx = nullptr;
ret->dl_tensor = *dl_tensor;
return ret;
}
} // namespace
TVM_REGISTER_GLOBAL("graph_index._CAPI_DGLGraphCreate")
......
// DGL Scheduler implementation
#include <unordered_map>
#include <vector>
#include <dgl/scheduler.h>
namespace dgl {
namespace sched {
std::vector<IdArray> DegreeBucketing(const IdArray& vids) {
const auto n_msgs = vids->shape[0];
const int64_t* vid_data = static_cast<int64_t*>(vids->data);
// inedge: dst->msgs
std::unordered_map<int64_t, std::vector<int64_t>> in_edges;
for (int64_t mid = 0; mid < n_msgs; ++mid) {
in_edges[vid_data[mid]].push_back(mid);
}
// bkt: deg->dsts
std::unordered_map<int64_t, std::vector<int64_t>> bkt;
for (auto& it: in_edges) {
bkt[it.second.size()].push_back(it.first);
}
// initialize output
int64_t n_deg = bkt.size();
int64_t n_dst = in_edges.size();
IdArray degs = IdArray::Empty({n_deg}, vids->dtype, vids->ctx);
IdArray nids = IdArray::Empty({n_dst}, vids->dtype, vids->ctx);
IdArray nid_section = IdArray::Empty({n_deg}, vids->dtype, vids->ctx);
IdArray mids = IdArray::Empty({n_msgs}, vids->dtype, vids->ctx);
IdArray mid_section = IdArray::Empty({n_deg}, vids->dtype, vids->ctx);
int64_t* deg_ptr = static_cast<int64_t*>(degs->data);
int64_t* nid_ptr = static_cast<int64_t*>(nids->data);
int64_t* nsec_ptr = static_cast<int64_t*>(nid_section->data);
int64_t* mid_ptr = static_cast<int64_t*>(mids->data);
int64_t* msec_ptr = static_cast<int64_t*>(mid_section->data);
// fill in bucketing ordering
for (auto& it: bkt) { // for each bucket
int64_t deg = it.first;
int64_t n_dst = it.second.size();
*deg_ptr++ = deg;
*nsec_ptr++ = n_dst;
*msec_ptr++ = deg * n_dst;
for (auto dst: it.second) { // for each dst in this bucket
*nid_ptr++ = dst;
for (auto mid: in_edges[dst]) { // for each in edge of dst
*mid_ptr++ = mid;
}
}
}
std::vector<IdArray> ret;
ret.push_back(std::move(degs));
ret.push_back(std::move(nids));
ret.push_back(std::move(nid_section));
ret.push_back(std::move(mids));
ret.push_back(std::move(mid_section));
return std::move(ret);
}
} // namespace sched
} // namespace dgl
#include "../c_api_common.h"
#include <dgl/graph.h>
#include <dgl/scheduler.h>
using tvm::runtime::TVMArgs;
using tvm::runtime::TVMRetValue;
using tvm::runtime::NDArray;
namespace dgl {
TVM_REGISTER_GLOBAL("scheduler._CAPI_DGLDegreeBucketing")
.set_body([] (TVMArgs args, TVMRetValue* rv) {
const IdArray vids = IdArray::FromDLPack(CreateTmpDLManagedTensor(args[0]));
*rv = ConvertNDArrayVectorToPackedFunc(sched::DegreeBucketing(vids));
});
TVM_REGISTER_GLOBAL("scheduler._CAPI_DGLDegreeBucketingFromGraph")
.set_body([] (TVMArgs args, TVMRetValue* rv) {
GraphHandle ghandle = args[0];
const Graph* gptr = static_cast<Graph*>(ghandle);
auto edges = gptr->Edges(false);
*rv = ConvertNDArrayVectorToPackedFunc(sched::DegreeBucketing(edges.dst));
});
} // namespace dgl
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment