Commit 5b9147c4 authored by VoVAllen's avatar VoVAllen Committed by Lingfan Yu
Browse files

[Feature] Edge Group Apply API (#358)

* add rtfd

* rrr

* update

* change env

* temp fix

* update

* fix

* fix

* add

* conf

* Move file_pattern from Makefile to conf.py

* remove yml

* fix

* fix

* fix

* fix

* remove yml

* remove yml

* add doc docker

* add dgl install script

* change name

* change dockerfile

* fix

* name

* add

* fix

* fix

* fix

* fix

* fix docker

* delete sphinx.py for doc-build backend

* Add softmax to test backend

* Add group apply function and tests

* Delete unnecessary file

* Update comments and test

* Fix lint

* remove unused bucketing code

* group apply edge bucketing code

* gen degree bucket schedule for group apply edge

* schedule and graph code

* fix compiling

* fix

* fix lint

* naming

* harder test case

* fix comments

* more comments

* tweak function name
parent 9cd0df7d
......@@ -7,6 +7,7 @@
#define DGL_GRAPH_H_
#include <vector>
#include <string>
#include <cstdint>
#include <utility>
#include <tuple>
......
......@@ -23,7 +23,7 @@ namespace sched {
* \note If there are multiple messages going into the same destination vertex, then
* there will be multiple copies of the destination vertex in vids
* \return a vector of 5 IdArrays for degree bucketing. The 5 arrays are:
* degrees: of degrees for each bucket
* degrees: degrees for each bucket
* nids: destination node ids
* nid_section: number of nodes in each bucket (used to split nids)
* mids: message ids
......@@ -32,6 +32,26 @@ namespace sched {
std::vector<IdArray> DegreeBucketing(const IdArray& msg_ids, const IdArray& vids,
const IdArray& recv_ids);
/*!
* \brief Generate degree bucketing schedule for group_apply edge
* \param uids One end vertex of edge by which edges are grouped
* \param vids The other end vertex of edge
* \param eids Edge ids
* \note This function always generate group_apply schedule based on degrees of
* nodes in uids. Therefore, if group_apply by source nodes, then uids
* should be source. If group_apply by destination nodes, then uids
* should be destination.
* \return a vector of 5 IdArrays for degree bucketing. The 5 arrays are:
* degrees: degrees for each bucket
* new_uids: uids reordered by degree bucket
* new_vids: vids reordered by degree bucket
* new_edis: eids reordered by degree bucket
* sections: number of edges in each degree bucket (used to partition
* new_uids, new_vids, and new_eids)
*/
std::vector<IdArray> GroupEdgeByNodeDegree(const IdArray& uids,
const IdArray& vids, const IdArray& eids);
} // namespace sched
} // namespace dgl
......
......@@ -1923,6 +1923,90 @@ class DGLGraph(object):
inplace=inplace)
Runtime.run(prog)
def group_apply_edges(self, group_by, func, edges=ALL, inplace=False):
"""Group the edges by nodes and apply the function on the grouped edges to
update their features.
Parameters
----------
group_by : str
Specify how to group edges. Expected to be either 'src' or 'dst'
func : callable
Apply function on the edge. The function should be
an :mod:`Edge UDF <dgl.udf>`. The input of `Edge UDF` should
be (bucket_size, degrees, *feature_shape), and
return the dict with values of the same shapes.
edges : valid edges type, optional
Edges on which to group and apply ``func``. See :func:`send` for valid
edges type. Default is all the edges.
inplace: bool, optional
If True, update will be done in place, but autograd will break.
Notes
-----
On multigraphs, if :math:`u` and :math:`v` are specified, then all the edges
between :math:`u` and :math:`v` will be updated.
Examples
--------
.. note:: Here we use pytorch syntax for demo. The general idea applies
to other frameworks with minor syntax change (e.g. replace
``torch.tensor`` with ``mxnet.ndarray``).
>>> import torch as th
>>> g = dgl.DGLGraph()
>>> g.add_nodes(4)
>>> g.add_edges(0, [1, 2, 3])
>>> g.add_edges(1, [2, 3])
>>> g.add_edges(2, [2, 3])
>>> g.edata['feat'] = th.randn((g.number_of_edges(), 1))
>>> # Softmax over the out edges of each node
>>> # Second dimension of edges.data is the degree dimension
>>> def softmax_feat(edges): return {'norm_feat': th.softmax(edges.data['feat'], dim=1)}
>>> g.group_apply_edges(func=softmax_feat, group_by='src') # Apply func to the first edge.
>>> u, v, eid = g.out_edges(1, form='all')
>>> in_feat = g.edata['feat'][eid]
>>> out_feat = g.edata['norm_feat'][eid]
>>> print(out_feat - th.softmax(in_feat, 0))
tensor([[0.],
[0.]])
See Also
--------
apply_edges
"""
assert func is not None
if group_by not in ('src', 'dst'):
raise DGLError("Group_by should be either src or dst")
if is_all(edges):
u, v, _ = self._graph.edges()
eid = utils.toindex(slice(0, self.number_of_edges()))
elif isinstance(edges, tuple):
u, v = edges
u = utils.toindex(u)
v = utils.toindex(v)
# Rewrite u, v to handle edge broadcasting and multigraph.
u, v, eid = self._graph.edge_ids(u, v)
else:
eid = utils.toindex(edges)
u, v, _ = self._graph.find_edges(eid)
with ir.prog() as prog:
scheduler.schedule_group_apply_edge(graph=self,
u=u,
v=v,
eid=eid,
apply_func=func,
group_by=group_by,
inplace=inplace)
Runtime.run(prog)
def send(self, edges=ALL, message_func="default"):
"""Send messages along the given edges.
......
......@@ -2,9 +2,8 @@
from __future__ import absolute_import
from .._ffi.function import _init_api
from ..base import is_all
from .. import backend as F
from ..udf import NodeBatch
from ..udf import NodeBatch, EdgeBatch
from .. import utils
from . import ir
......@@ -98,41 +97,9 @@ def _degree_bucketing_schedule(mids, dsts, v):
"""
buckets = _CAPI_DGLDegreeBucketing(mids.todgltensor(), dsts.todgltensor(),
v.todgltensor())
return _process_buckets(buckets)
return _process_node_buckets(buckets)
def _degree_bucketing_for_edges(dsts):
"""Return the bucketing by degree scheduling for destination nodes of
messages
Parameters
----------
dsts: utils.Index
destination node for each message
"""
buckets = _CAPI_DGLDegreeBucketingForEdges(dsts.todgltensor())
return _process_buckets(buckets)
def _degree_bucketing_for_graph(graph, v):
"""Return the bucketing by degree scheduling given graph index and optional
dst nodes
Parameters:
-----------
graph: GraphIndex
DGLGraph Index (update all case) or message graph index (recv cases)
v: utils.Index
Destination nodes (recv cases)
"""
if is_all(v):
buckets = _CAPI_DGLDegreeBucketingForFullGraph(graph._handle)
else:
buckets = _CAPI_DGLDegreeBucketingForRecvNodes(graph._handle,
v.todgltensor())
return _process_buckets(buckets)
def _process_buckets(buckets):
def _process_node_buckets(buckets):
"""read bucketing auxiliary data
Returns
......@@ -189,4 +156,152 @@ def _create_per_bkt_rfunc(graph, reduce_udf, deg, vbkt):
return reduce_udf(nbatch)
return _rfunc_wrapper
def gen_group_apply_edge_schedule(
graph,
apply_func,
u, v, eid,
group_by,
var_nf,
var_ef,
var_out):
"""Create degree bucketing schedule for group_apply_edge
Edges will be grouped by either its source node or destination node
specified by 'group_by', and will be divided into buckets in which
'group_by' nodes have the same degree. The apply_func UDF will be applied
to each bucket. The per-bucket result will be merged according to the
*unique-ascending order* of the edge ids.
Parameters
----------
graph : DGLGraph
DGLGraph to use
apply_func: callable
The edge_apply_func UDF
u: utils.Index
Source nodes of edges to apply
v: utils.Index
Destination nodes of edges to apply
eid: utils.Index
Edges to apply
group_by: str
If "src", group by u. If "dst", group by v
var_nf : var.FEAT_DICT
The variable for node feature frame.
var_ef : var.FEAT_DICT
The variable for edge frame.
var_out : var.FEAT_DICT
The variable for output feature dicts.
"""
if group_by == "src":
buckets = _degree_bucketing_for_edge_grouping(u, v, eid)
degs, uids, vids, eids = buckets
elif group_by == "dst":
buckets = _degree_bucketing_for_edge_grouping(v, u, eid)
degs, vids, uids, eids = buckets
else:
raise DGLError("group_apply_edge must be grouped by either src or dst")
idx_list = []
fd_list = []
for deg, u_bkt, v_bkt, eid_bkt in zip(degs, uids, vids, eids):
# create per-bkt efunc
_efunc = var.FUNC(_create_per_bkt_efunc(graph, apply_func, deg,
u_bkt, v_bkt, eid_bkt))
# vars
var_u = var.IDX(u_bkt)
var_v = var.IDX(v_bkt)
var_eid = var.IDX(eid_bkt)
# apply edge UDF on each bucket
fdsrc = ir.READ_ROW(var_nf, var_u)
fddst = ir.READ_ROW(var_nf, var_v)
fdedge = ir.READ_ROW(var_ef, var_eid)
fdedge = ir.EDGE_UDF(_efunc, fdsrc, fdedge, fddst, ret=fdedge) # reuse var
# save for merge
idx_list.append(var_eid)
fd_list.append(fdedge)
# merge buckets according to the ascending order of the edge ids.
all_idx = F.cat([idx.data.tousertensor() for idx in idx_list], dim=0)
_, order = F.sort_1d(all_idx)
var_order = var.IDX(utils.toindex(order))
ir.MERGE_ROW(var_order, fd_list, ret=var_out)
def _degree_bucketing_for_edge_grouping(uids, vids, eids):
"""Return the edge buckets by degree and grouped nodes for group_apply_edge
Parameters
----------
degree
uids: utils.Index
node id of one end of eids, based on which edges are grouped
vids: utils.Index
node id of the other end of eids
eids: utils.Index
edge id for each edge
"""
buckets = _CAPI_DGLGroupEdgeByNodeDegree(uids.todgltensor(),
vids.todgltensor(),
eids.todgltensor())
return _process_edge_buckets(buckets)
def _process_edge_buckets(buckets):
"""read bucketing auxiliary data for group_apply_edge buckets
Returns
-------
degrees: numpy.ndarray
A list of degree for each bucket
uids: list of utils.Index
A list of node id buckets, nodes in each bucket have the same degree
vids: list of utils.Index
A list of node id buckets
eids: list of utils.Index
A list of edge id buckets
"""
# get back results
degs = buckets(0).asnumpy()
uids = utils.toindex(buckets(1))
vids = utils.toindex(buckets(2))
eids = utils.toindex(buckets(3))
# XXX: convert directly from ndarary to python list?
sections = buckets(4).asnumpy().tolist()
# split buckets and convert to index
def split(to_split):
res = F.split(to_split.tousertensor(), sections, 0)
return map(utils.toindex, res)
uids = split(uids)
vids = split(vids)
eids = split(eids)
return degs, uids, vids, eids
def _create_per_bkt_efunc(graph, apply_func, deg, u, v, eid):
"""Internal function to generate the per degree bucket edge UDF."""
batch_size = len(u) // deg
def _efunc_wrapper(src_data, edge_data, dst_data):
def _reshape_func(data):
def _reshaped_getter(key):
feat = data[key]
new_shape = (batch_size, deg) + F.shape(feat)[1:]
return F.reshape(feat, new_shape)
return _reshaped_getter
def _reshape_back(data):
shape = F.shape(data)[2:]
new_shape = (batch_size * deg,) + shape
return F.reshape(data, new_shape)
reshaped_src_data = utils.LazyDict(_reshape_func(src_data),
src_data.keys())
reshaped_edge_data = utils.LazyDict(_reshape_func(edge_data),
edge_data.keys())
reshaped_dst_data = utils.LazyDict(_reshape_func(dst_data),
dst_data.keys())
ebatch = EdgeBatch(graph, (u, v, eid), reshaped_src_data,
reshaped_edge_data, reshaped_dst_data)
return {k: _reshape_back(v) for k, v in apply_func(ebatch).items()}
return _efunc_wrapper
_init_api("dgl.runtime.degree_bucketing")
......@@ -21,6 +21,7 @@ __all__ = [
"schedule_snr",
"schedule_apply_nodes",
"schedule_apply_edges",
"schedule_group_apply_edge",
"schedule_push",
"schedule_pull"
]
......@@ -360,6 +361,49 @@ def schedule_pull(graph,
else:
ir.WRITE_ROW_(var_nf, var_pull_nodes, final_feat)
def schedule_group_apply_edge(graph,
u, v, eid,
apply_func,
group_by,
inplace):
"""group apply edges schedule
Parameters
----------
graph: DGLGraph
The DGLGraph to use
u : utils.Index
Source nodes of edges to apply
v : utils.Index
Destination nodes of edges to apply
eid : utils.Index
Ids of sending edges
apply_func: callable
The apply edge function
group_by : str
Specify how to group edges. Expected to be either 'src' or 'dst'
inplace: bool
If True, the update will be done in place
Returns
-------
A list of executors for DGL Runtime
"""
# vars
var_nf = var.FEAT_DICT(graph._node_frame, name='nf')
var_ef = var.FEAT_DICT(graph._edge_frame, name='ef')
var_out = var.FEAT_DICT(name='new_ef')
# TODO (lingfan): check if apply_func is a DGL builtin
db.gen_group_apply_edge_schedule(graph, apply_func, u, v, eid, group_by,
var_nf, var_ef, var_out)
var_eid = var.IDX(eid)
if inplace:
ir.WRITE_ROW_INPLACE_(var_ef, var_eid, var_out)
else:
ir.WRITE_ROW_(var_ef, var_eid, var_out)
def _check_builtin_func_list(func_list):
"""Check whether func_list only contains builtin functions."""
for fn in func_list:
......
......@@ -92,6 +92,68 @@ std::vector<IdArray> DegreeBucketing(const IdArray& msg_ids, const IdArray& vids
return std::move(ret);
}
std::vector<IdArray> GroupEdgeByNodeDegree(const IdArray& uids, const IdArray& vids,
const IdArray& eids) {
auto n_edge = eids->shape[0];
const int64_t* eid_data = static_cast<int64_t*>(eids->data);
const int64_t* uid_data = static_cast<int64_t*>(uids->data);
const int64_t* vid_data = static_cast<int64_t*>(vids->data);
// node2edge: group_by nodes uid -> (eid, the other end vid)
std::unordered_map<int64_t,
std::vector<std::pair<int64_t, int64_t>>> node2edge;
for (int64_t i = 0; i < n_edge; ++i) {
node2edge[uid_data[i]].emplace_back(eid_data[i], vid_data[i]);
}
// bkt: deg -> group_by node uid
std::unordered_map<int64_t, std::vector<int64_t>> bkt;
for (const auto& it : node2edge) {
bkt[it.second.size()].push_back(it.first);
}
// number of unique degree
int64_t n_deg = bkt.size();
// initialize output
IdArray degs = IdArray::Empty({n_deg}, eids->dtype, eids->ctx);
IdArray new_uids = IdArray::Empty({n_edge}, uids->dtype, uids->ctx);
IdArray new_vids = IdArray::Empty({n_edge}, vids->dtype, vids->ctx);
IdArray new_eids = IdArray::Empty({n_edge}, eids->dtype, eids->ctx);
IdArray sections = IdArray::Empty({n_deg}, eids->dtype, eids->ctx);
int64_t* deg_ptr = static_cast<int64_t*>(degs->data);
int64_t* uid_ptr = static_cast<int64_t*>(new_uids->data);
int64_t* vid_ptr = static_cast<int64_t*>(new_vids->data);
int64_t* eid_ptr = static_cast<int64_t*>(new_eids->data);
int64_t* sec_ptr = static_cast<int64_t*>(sections->data);
// fill in bucketing ordering
for (const auto& it : bkt) { // for each bucket
// degree of this bucket
const int64_t deg = it.first;
// number of edges in this bucket
const int64_t bucket_size = it.second.size();
*deg_ptr++ = deg;
*sec_ptr++ = deg * bucket_size;
for (const auto u : it.second) { // for uid in this bucket
for (const auto& pair : node2edge[u]) { // for each edge of uid
*uid_ptr++ = u;
*vid_ptr++ = pair.second;
*eid_ptr++ = pair.first;
}
}
}
std::vector<IdArray> ret;
ret.push_back(std::move(degs));
ret.push_back(std::move(new_uids));
ret.push_back(std::move(new_vids));
ret.push_back(std::move(new_eids));
ret.push_back(std::move(sections));
return std::move(ret);
}
} // namespace sched
} // namespace dgl
......@@ -18,43 +18,16 @@ DGL_REGISTER_GLOBAL("runtime.degree_bucketing._CAPI_DGLDegreeBucketing")
const IdArray msg_ids = IdArray::FromDLPack(CreateTmpDLManagedTensor(args[0]));
const IdArray vids = IdArray::FromDLPack(CreateTmpDLManagedTensor(args[1]));
const IdArray nids = IdArray::FromDLPack(CreateTmpDLManagedTensor(args[2]));
*rv = ConvertNDArrayVectorToPackedFunc(sched::DegreeBucketing(msg_ids, vids, nids));
});
DGL_REGISTER_GLOBAL("runtime.degree_bucketing._CAPI_DGLDegreeBucketingForEdges")
.set_body([] (DGLArgs args, DGLRetValue* rv) {
const IdArray vids = IdArray::FromDLPack(CreateTmpDLManagedTensor(args[0]));
// XXX: better way to do arange?
int64_t n_msgs = vids->shape[0];
IdArray msg_ids = IdArray::Empty({n_msgs}, vids->dtype, vids->ctx);
int64_t* mid_data = static_cast<int64_t*>(msg_ids->data);
for (int64_t i = 0; i < n_msgs; ++i) {
mid_data[i] = i;
}
*rv = ConvertNDArrayVectorToPackedFunc(sched::DegreeBucketing(msg_ids, vids, vids));
});
DGL_REGISTER_GLOBAL("runtime.degree_bucketing._CAPI_DGLDegreeBucketingForRecvNodes")
DGL_REGISTER_GLOBAL("runtime.degree_bucketing._CAPI_DGLGroupEdgeByNodeDegree")
.set_body([] (DGLArgs args, DGLRetValue* rv) {
GraphHandle ghandle = args[0];
const Graph* gptr = static_cast<Graph*>(ghandle);
const IdArray uids = IdArray::FromDLPack(CreateTmpDLManagedTensor(args[0]));
const IdArray vids = IdArray::FromDLPack(CreateTmpDLManagedTensor(args[1]));
const auto& edges = gptr->InEdges(vids);
*rv = ConvertNDArrayVectorToPackedFunc(sched::DegreeBucketing(edges.id, edges.dst, vids));
const IdArray eids = IdArray::FromDLPack(CreateTmpDLManagedTensor(args[2]));
*rv = ConvertNDArrayVectorToPackedFunc(
sched::GroupEdgeByNodeDegree(uids, vids, eids));
});
DGL_REGISTER_GLOBAL("runtime.degree_bucketing._CAPI_DGLDegreeBucketingForFullGraph")
.set_body([] (DGLArgs args, DGLRetValue* rv) {
GraphHandle ghandle = args[0];
const Graph* gptr = static_cast<Graph*>(ghandle);
const auto& edges = gptr->Edges("");
int64_t n_vertices = gptr->NumVertices();
IdArray nids = IdArray::Empty({n_vertices}, edges.dst->dtype, edges.dst->ctx);
int64_t* nid_data = static_cast<int64_t*>(nids->data);
for (int64_t i = 0; i < n_vertices; ++i) {
nid_data[i] = i;
}
*rv = ConvertNDArrayVectorToPackedFunc(sched::DegreeBucketing(edges.id, edges.dst, nids));
});
} // namespace dgl
......@@ -26,6 +26,7 @@ _tensor = tensor
_arange = arange
_full = full
_full_1d = full_1d
_softmax = softmax
_default_context_str = os.getenv('DGLTESTDEV', 'cpu')
_context_dict = {
'cpu': cpu(),
......@@ -56,3 +57,6 @@ def full(shape, fill_value, dtype, ctx=_default_context):
def full_1d(length, fill_value, dtype, ctx=_default_context):
return _full_1d(length, fill_value, dtype, ctx)
def softmax(x, dim):
return _softmax(x, dim)
\ No newline at end of file
......@@ -67,6 +67,10 @@ def reduce_sum(x):
"""Sums all the elements into a single scalar."""
pass
def softmax(x, dim):
"""Softmax Operation on Tensors"""
pass
###############################################################################
# Tensor functions used *only* on index tensor
# ----------------
......
......@@ -45,6 +45,8 @@ def clone(x):
def reduce_sum(x):
return x.sum()
def softmax(x, dim):
return nd.softmax(x, dim)
record_grad = autograd.record
......
......@@ -45,6 +45,8 @@ def clone(x):
def reduce_sum(x):
return x.sum()
def softmax(x, dim):
return th.softmax(x, dim)
class record_grad(object):
def __init__(self):
......
......@@ -625,6 +625,40 @@ def test_repr():
repr_string = G.__repr__()
print(repr_string)
def test_group_apply_edges():
def edge_udf(edges):
h = F.sum(edges.data['feat'] * (edges.src['h'] + edges.dst['h']), dim=2)
normalized_feat = F.softmax(h, dim=1)
return {"norm_feat": normalized_feat}
g = DGLGraph()
g.add_nodes(10)
g.add_edges(0, [1, 2, 3, 4, 5, 6, 7, 8])
g.add_edges(1, [2, 3, 4, 6, 7, 8])
g.add_edges(2, [2, 3, 4, 5, 6, 7, 8])
g.ndata['h'] = F.randn((g.number_of_nodes(), D))
g.edata['feat'] = F.randn((g.number_of_edges(), D))
def _test(group_by):
g.group_apply_edges(group_by=group_by, func=edge_udf)
if group_by == 'src':
u, v, eid = g.out_edges(1, form='all')
else:
u, v, eid = g.in_edges(5, form='all')
out_feat = g.edata['norm_feat'][eid]
result = (g.ndata['h'][u] + g.ndata['h'][v]) * g.edata['feat'][eid]
result = F.softmax(F.sum(result, dim=1), dim=0)
assert F.allclose(out_feat, result)
# test group by source nodes
_test('src')
# test group by destination nodes
_test('dst')
if __name__ == '__main__':
test_nx_conversion()
test_batch_setter_getter()
......@@ -641,3 +675,4 @@ if __name__ == '__main__':
test_send_multigraph()
test_dynamic_addition()
test_repr()
test_group_apply_edges()
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment