"src/vscode:/vscode.git/clone" did not exist on "b38255006af56b98aab3a4027939e662ed343468"
Commit deb653f8 authored by Lingfan Yu's avatar Lingfan Yu Committed by Minjie Wang
Browse files

[Runtime] Scheduler and Executor (#140)

* executor api

* draft executor interface

* WIP

* revert changes to avoid conflict with api change

* core scheduling logic

* WIP: build graph adj

* incidence matrix for in edges

* support incidence matrix for partial recv nodes

* improve

* build adjmat in scheduler

* graph store

* get degree bucketing schedule

* connect to c++ degree bucketing

* conceptual executor creation code

* executor comments

* fix

* more executor comments

* WIP: full send_and_recv schedule

* most schedulers

* simplify scheduler

* executors

* runtime

* builtin function base class

* adj indices and shape

* completely refactor scheduler

* rename and move bundled out to function.py

* use_edge_feature in msg func

* rewrite scheduler

* node edge executor

* connect with graph api

* handle zero degree

* misc

* fix test cases

* fix a good many bugs...

* remove old scheduler

* push and pull

* fix send recv

* c++ lint

* fix batched send recv

* hot fix for mxnet

* typo

* write back executor

* apply node edge

* clean up, doc string

* fix as requested

* refactor

* fix

* WIP

* WIP

* ir draft

* more on ir

* WIP: spmv schedule

* WIP

* recv schedule

* refactor

* WIP

* snr degree bucketing

* snr scheduler

* move prog to graph.py; rename

* unittest for send/recv

* remove some legacy codes

* WIP: update_all

* pass test_basics

* passed all current utests

* more utests; fix mx utest

* WIP: fixing zero deg initial value

* some tests

* fix 0deg problem

* fix mx

* fix mx

* some notes

* fix as requested
parent 3e8b63ec
......@@ -17,7 +17,9 @@ namespace sched {
/*!
* \brief Generate degree bucketing schedule
* \param vids The destination vertex for messages
* \param msg_ids The edge id for each message
* \param vids The destination vertex for each message
* \param recv_ids The recv nodes (for checking zero degree nodes)
* \note If there are multiple messages going into the same destination vertex, then
* there will be multiple copies of the destination vertex in vids
* \return a vector of 5 IdArrays for degree bucketing. The 5 arrays are:
......@@ -27,7 +29,8 @@ namespace sched {
* mids: message ids
* mid_section: number of messages in each bucket (used to split mids)
*/
std::vector<IdArray> DegreeBucketing(const IdArray& vids);
std::vector<IdArray> DegreeBucketing(const IdArray& msg_ids, const IdArray& vids,
const IdArray& recv_ids);
} // namespace sched
......
......@@ -2,15 +2,15 @@
from __future__ import absolute_import
from collections import MutableMapping, namedtuple
import sys
import numpy as np
from . import backend as F
from .base import DGLError, dgl_warning
from .init import zero_initializer
from . import utils
import sys
class Scheme(namedtuple('Scheme', ['shape', 'dtype'])):
"""The column scheme.
......@@ -160,11 +160,6 @@ class Column(object):
else:
return Column(data)
def zero_initializer(shape, dtype, ctx):
return F.zeros(shape, dtype, ctx)
class Frame(MutableMapping):
"""The columnar storage for node/edge features.
......@@ -320,7 +315,8 @@ class Frame(MutableMapping):
if self.get_initializer(name) is None:
self._warn_and_set_initializer()
init_data = self.get_initializer(name)(
(self.num_rows,) + scheme.shape, scheme.dtype, ctx)
(self.num_rows,) + scheme.shape, scheme.dtype,
ctx, slice(0, self.num_rows))
self._columns[name] = Column(init_data, scheme)
def add_rows(self, num_rows):
......@@ -334,8 +330,6 @@ class Frame(MutableMapping):
num_rows : int
The number of new rows
"""
self._num_rows += num_rows
feat_placeholders = {}
for key, col in self._columns.items():
scheme = col.scheme
......@@ -343,10 +337,11 @@ class Frame(MutableMapping):
if self.get_initializer(key) is None:
self._warn_and_set_initializer()
new_data = self.get_initializer(key)(
(num_rows,) + scheme.shape, scheme.dtype, ctx)
(num_rows,) + scheme.shape, scheme.dtype,
ctx, slice(self._num_rows, self._num_rows + num_rows))
feat_placeholders[key] = new_data
self._append(Frame(feat_placeholders))
self._num_rows += num_rows
def update_column(self, name, data):
"""Add or replace the column with the given name and data.
......@@ -476,6 +471,21 @@ class FrameRef(MutableMapping):
"""
self._frame.set_initializer(initializer, column=column)
def get_initializer(self, column=None):
"""Get the initializer for empty values for the given column.
Parameters
----------
column : str
The column
Returns
-------
callable
The initializer
"""
return self._frame.get_initializer(column)
def index(self):
"""Return the index object.
......@@ -553,6 +563,12 @@ class FrameRef(MutableMapping):
"""
if isinstance(key, str):
return self.select_column(key)
elif isinstance(key, slice) and key == slice(0, self.num_rows):
# shortcut for selecting all the rows
return self
elif isinstance(key, utils.Index) and key.is_slice(0, self.num_rows):
# shortcut for selecting all the rows
return self
else:
return self.select_rows(key)
......@@ -616,6 +632,12 @@ class FrameRef(MutableMapping):
"""
if isinstance(key, str):
self.update_column(key, val, inplace=False)
elif isinstance(key, slice) and key == slice(0, self.num_rows):
# shortcut for updating all the rows
return self.update(val)
elif isinstance(key, utils.Index) and key.is_slice(0, self.num_rows):
# shortcut for selecting all the rows
return self.update(val)
else:
self.update_rows(key, val, inplace=False)
......@@ -807,6 +829,33 @@ class FrameRef(MutableMapping):
self._index = None
self._index_or_slice = None
def frame_like(other, num_rows):
"""Create a new frame that has the same scheme as the given one.
Parameters
----------
other : Frame
The given frame.
num_rows : int
The number of rows of the new one.
Returns
-------
Frame
The new frame.
"""
# TODO(minjie): scheme is not inherited at the moment. Fix this
# when moving per-col initializer to column scheme.
newf = Frame(num_rows=num_rows)
# set global initializr
if other.get_initializer() is None:
other._warn_and_set_initializer()
newf._default_initializer = other._default_initializer
# set per-col initializer
for key in other.keys():
newf.set_initializer(other.get_initializer(key), key)
return newf
def merge_frames(frames, indices, max_index, reduce_func):
"""Merge a list of frames.
......
......@@ -3,3 +3,4 @@ from __future__ import absolute_import
from .message import *
from .reducer import *
from .base import *
"""Built-in functions."""
"""Built-in function base class"""
from __future__ import absolute_import
from functools import update_wrapper
class BuiltinFunction(object):
"""Base builtin function class."""
__all__ = ['create_bundled_function_class']
def __call__(self):
"""Regular computation of this builtin function
def create_bundled_function_class(name, cls):
class Bundled(cls):
This will be used when optimization is not available.
"""
raise NotImplementedError
@property
def name(self):
"""Return the name of this builtin function."""
raise NotImplementedError
class BundledFunction(object):
def __init__(self, fn_list):
if not isinstance(fn_list, (list, tuple)):
fn_list = [fn_list]
self.fn_list = fn_list
def is_spmv_supported(self, *args, **kwargs):
return all(isinstance(fn, cls) and
fn.is_spmv_supported(*args, **kwargs)
for fn in self.fn_list)
def __call__(self, *args, **kwargs):
ret = {}
for fn in self.fn_list:
result = fn(*args, **kwargs)
ret.update(result)
ret.update(fn(*args, **kwargs))
return ret
@property
def name(self):
return "bundled"
# Fake the names for introspection
Bundled.__module__ = cls.__module__
Bundled.__name__ = name
Bundled.__qualname__ = name
for method_name in ('__init__', '__call__', 'is_spmv_supported', 'name'):
method = getattr(Bundled, method_name)
method.__qualname__ = '{}.{}'.format(Bundled.__qualname__, method_name)
for method_name in ('__call__', 'is_spmv_supported', 'name'):
method = getattr(Bundled, method_name)
method = update_wrapper(method,
cls.__dict__[method.__name__],
('__module__', '__doc__', '__annotations__'))
return Bundled
"""Built-in message function."""
from __future__ import absolute_import
from .base import BuiltinFunction
import operator
import dgl.backend as F
from .base import create_bundled_function_class
__all__ = ["src_mul_edge", "copy_src", "copy_edge"]
class MessageFunction(object):
class MessageFunction(BuiltinFunction):
"""Base builtin message function class."""
def __call__(self, edges):
......@@ -18,6 +18,7 @@ class MessageFunction(object):
"""
raise NotImplementedError
@property
def name(self):
"""Return the name of this builtin function."""
raise NotImplementedError
......@@ -26,9 +27,9 @@ class MessageFunction(object):
"""Return whether the SPMV optimization is supported."""
raise NotImplementedError
BundledMessageFunction = create_bundled_function_class(
'BundledMessageFunction', MessageFunction)
@property
def use_edge_feature(self):
raise NotImplementedError
def _is_spmv_supported_node_feat(g, field):
......@@ -64,15 +65,22 @@ class SrcMulEdgeMessageFunction(MessageFunction):
def __call__(self, edges):
src_data = edges.src[self.src_field]
edata = edges.data[self.edge_field]
if F.ndim(edata) == 1:
# edge feature is a scalar, unsqueeze dims of len 1
src_dim = F.ndim(src_data)
eshape = F.shape(edata)[0]
ret = self.mul_op(edges.src[self.src_field],
F.reshape(edges.data[self.edge_field], (eshape,) + (1,) * (src_dim - 1)))
new_eshape = (F.shape(edata)[0],) + (1,) * (src_dim - 1)
edata = F.reshape(edata, new_eshape)
ret = self.mul_op(src_data, edata)
return {self.out_field : ret}
@property
def name(self):
return "src_mul_edge"
@property
def use_edge_feature(self):
return True
class CopySrcMessageFunction(MessageFunction):
def __init__(self, src_field, out_field):
self.src_field = src_field
......@@ -84,9 +92,14 @@ class CopySrcMessageFunction(MessageFunction):
def __call__(self, edges):
return {self.out_field : edges.src[self.src_field]}
@property
def name(self):
return "copy_src"
@property
def use_edge_feature(self):
return False
class CopyEdgeMessageFunction(MessageFunction):
def __init__(self, edge_field=None, out_field=None):
self.edge_field = edge_field
......@@ -100,9 +113,14 @@ class CopyEdgeMessageFunction(MessageFunction):
def __call__(self, edges):
return {self.out_field : edges.data[self.edge_field]}
@property
def name(self):
return "copy_edge"
@property
def use_edge_feature(self):
return True
def src_mul_edge(src, edge, out):
"""Builtin message function that computes message by multiplying source node features
......
......@@ -2,11 +2,11 @@
from __future__ import absolute_import
from .. import backend as F
from .base import create_bundled_function_class
from .base import BuiltinFunction
__all__ = ["sum", "max"]
class ReduceFunction(object):
class ReduceFunction(BuiltinFunction):
"""Base builtin reduce function class."""
def __call__(self, nodes):
......@@ -16,6 +16,7 @@ class ReduceFunction(object):
"""
raise NotImplementedError
@property
def name(self):
"""Return the name of this builtin function."""
raise NotImplementedError
......@@ -25,10 +26,6 @@ class ReduceFunction(object):
raise NotImplementedError
BundledReduceFunction = create_bundled_function_class(
'BundledReduceFunction', ReduceFunction)
class SimpleReduceFunction(ReduceFunction):
"""Builtin reduce function that aggregates a single field into another
single field."""
......@@ -45,6 +42,7 @@ class SimpleReduceFunction(ReduceFunction):
def __call__(self, nodes):
return {self.out_field : self.op(nodes.mailbox[self.msg_field], 1)}
@property
def name(self):
return self._name
......
......@@ -8,11 +8,8 @@ import dgl
from .base import ALL, is_all, DGLError, dgl_warning
from . import backend as F
from .frame import FrameRef, Frame, merge_frames
from .function.message import BundledMessageFunction
from .function.reducer import BundledReduceFunction
from .graph_index import GraphIndex, create_graph_index
from . import scheduler
from .udf import NodeBatch, EdgeBatch
from .runtime import ir, scheduler, Runtime
from . import utils
from .view import NodeView, EdgeView
......@@ -735,7 +732,7 @@ class DGLGraph(object):
"""
return self._edge_frame.schemes
def set_n_initializer(self, initializer):
def set_n_initializer(self, initializer, field=None):
"""Set the initializer for empty node features.
Initializer is a callable that returns a tensor given the shape, data type
......@@ -745,10 +742,17 @@ class DGLGraph(object):
----------
initializer : callable
The initializer.
field : str, optional
The feature field name. Default is set an initializer for all the
feature fields.
See Also
--------
dgl.init.base_initializer
"""
self._node_frame.set_initializer(initializer)
self._node_frame.set_initializer(initializer, field)
def set_e_initializer(self, initializer):
def set_e_initializer(self, initializer, field=None):
"""Set the initializer for empty edge features.
Initializer is a callable that returns a tensor given the shape, data type
......@@ -758,8 +762,15 @@ class DGLGraph(object):
----------
initializer : callable
The initializer.
field : str, optional
The feature field name. Default is set an initializer for all the
feature fields.
See Also
--------
dgl.init.base_initializer
"""
self._edge_frame.set_initializer(initializer)
self._edge_frame.set_initializer(initializer, field)
@property
def nodes(self):
......@@ -1017,7 +1028,15 @@ class DGLGraph(object):
v : int, iterable of int, tensor, optional
The node id(s).
"""
self._internal_apply_nodes(v, func, inplace=inplace)
if func == "default":
func = self._apply_node_func
if is_all(v):
v = utils.toindex(slice(0, self.number_of_nodes()))
else:
v = utils.toindex(v)
with ir.prog() as prog:
scheduler.schedule_apply_nodes(graph=self, v=v, apply_func=func)
Runtime.run(prog)
def apply_edges(self, func="default", edges=ALL):
"""Apply the function on the edge features.
......@@ -1040,8 +1059,7 @@ class DGLGraph(object):
assert func is not None
if is_all(edges):
eid = ALL
u, v, _ = self._graph.edges()
u, v, eid = self._graph.edges()
elif isinstance(edges, tuple):
u, v = edges
u = utils.toindex(u)
......@@ -1052,12 +1070,10 @@ class DGLGraph(object):
eid = utils.toindex(edges)
u, v, _ = self._graph.find_edges(eid)
src_data = self.get_n_repr(u)
edge_data = self.get_e_repr(eid)
dst_data = self.get_n_repr(v)
eb = EdgeBatch(self, (u, v, eid),
src_data, edge_data, dst_data)
self.set_e_repr(func(eb), eid)
with ir.prog() as prog:
scheduler.schedule_apply_edges(graph=self, u=u, v=v,
eid=eid, apply_func=func)
Runtime.run(prog)
def send(self, edges, message_func="default"):
"""Send messages along the given edges.
......@@ -1078,8 +1094,6 @@ class DGLGraph(object):
if message_func == "default":
message_func = self._message_func
assert message_func is not None
if isinstance(message_func, (tuple, list)):
message_func = BundledMessageFunction(message_func)
if is_all(edges):
eid = ALL
......@@ -1094,20 +1108,19 @@ class DGLGraph(object):
eid = utils.toindex(edges)
u, v, _ = self._graph.find_edges(eid)
src_data = self.get_n_repr(u)
edge_data = self.get_e_repr(eid)
dst_data = self.get_n_repr(v)
eb = EdgeBatch(self, (u, v, eid),
src_data, edge_data, dst_data)
msgs = message_func(eb)
with ir.prog() as prog:
scheduler.schedule_send(graph=self, u=u, v=v, eid=eid,
message_func=message_func)
Runtime.run(prog)
# update message graph and frame
self._msg_graph.add_edges(u, v)
self._msg_frame.append(msgs)
def recv(self,
u,
v,
reduce_func="default",
apply_node_func="default"):
"""Receive and reduce in-coming messages and update representation on node u.
"""Receive and reduce in-coming messages and update representation on node v.
TODO(minjie): document on zero-in-degree case
TODO(minjie): document on how returned new features are merged with the old features
......@@ -1115,7 +1128,7 @@ class DGLGraph(object):
Parameters
----------
u : node, container or tensor
v : node, container or tensor
The node to be updated.
reduce_func : callable
The reduce function.
......@@ -1124,14 +1137,10 @@ class DGLGraph(object):
"""
if reduce_func == "default":
reduce_func = self._reduce_func
if apply_node_func == "default":
apply_node_func = self._apply_node_func
assert reduce_func is not None
if isinstance(reduce_func, (list, tuple)):
reduce_func = BundledReduceFunction(reduce_func)
self._batch_recv(u, reduce_func)
# optional apply nodes
self.apply_nodes(apply_node_func, u)
def _batch_recv(self, v, reduce_func):
if self._msg_frame.num_rows == 0:
# no message has ever been sent
return
......@@ -1146,54 +1155,14 @@ class DGLGraph(object):
# no vertex to be triggered.
return
# degree bucketing
degrees, v_buckets = scheduler.degree_bucketing(self._msg_graph, v)
if degrees == [0]:
# no message has been sent to the specified node
return
with ir.prog() as prog:
scheduler.schedule_recv(graph=self, recv_nodes=v,
reduce_func=reduce_func, apply_func=apply_node_func)
Runtime.run(prog)
reordered_v = []
new_reprs = []
has_zero_degree = False
for deg, v_bkt in zip(degrees, v_buckets):
if deg == 0:
# no need to trigger reduce func for zero-degree nodes
has_zero_degree = True
continue
bkt_len = len(v_bkt)
v_data = self.get_n_repr(v_bkt)
uu, vv, in_msg_ids = self._msg_graph.in_edges(v_bkt)
in_msgs = self._msg_frame.select_rows(in_msg_ids)
# Reshape the column tensor to (B, Deg, ...).
def _reshape_fn(msg):
msg_shape = F.shape(msg)
new_shape = (bkt_len, deg) + msg_shape[1:]
return F.reshape(msg, new_shape)
reshaped_in_msgs = utils.LazyDict(
lambda key: _reshape_fn(in_msgs[key]), self._msg_frame.schemes)
reordered_v.append(v_bkt.tousertensor())
nb = NodeBatch(self, v_bkt, v_data, reshaped_in_msgs)
new_reprs.append(reduce_func(nb))
# TODO(minjie): clear partial messages
# FIXME(minjie): multi send bug
self.reset_messages()
# Pack all reducer results together
reordered_v = F.cat(reordered_v, dim=0)
keys = new_reprs[0].keys()
new_reprs = {key : F.cat([repr[key] for repr in new_reprs], dim=0)
for key in keys}
if v_is_all and not has_zero_degree:
# First do reorder and then replace the whole column.
_, indices = F.sort_1d(reordered_v)
indices = utils.toindex(indices)
new_reprs = utils.reorder(new_reprs, indices)
self.set_n_repr(new_reprs)
else:
# Use setter to do reorder.
self.set_n_repr(new_reprs, reordered_v)
def send_and_recv(self,
edges,
message_func="default",
......@@ -1223,12 +1192,11 @@ class DGLGraph(object):
"""
if message_func == "default":
message_func = self._message_func
elif isinstance(message_func, (tuple, list)):
message_func = BundledMessageFunction(message_func)
if reduce_func == "default":
reduce_func = self._reduce_func
elif isinstance(reduce_func, (list, tuple)):
reduce_func = BundledReduceFunction(reduce_func)
if apply_node_func == "default":
apply_node_func = self._apply_node_func
assert message_func is not None
assert reduce_func is not None
......@@ -1246,35 +1214,10 @@ class DGLGraph(object):
# no edges to be triggered
return
if not self.is_multigraph:
executor = scheduler.get_executor(
'send_and_recv', self, src=u, dst=v,
message_func=message_func, reduce_func=reduce_func)
else:
executor = None
if executor:
accum = executor.run()
unique_v = executor.recv_nodes
else:
# message func
src_data = self.get_n_repr(u)
edge_data = self.get_e_repr(eid)
dst_data = self.get_n_repr(v)
eb = EdgeBatch(self, (u, v, eid),
src_data, edge_data, dst_data)
msgs = message_func(eb)
msg_frame = FrameRef(Frame(msgs))
# recv with degree bucketing
executor = scheduler.get_recv_executor(graph=self,
reduce_func=reduce_func,
message_frame=msg_frame,
edges=(u, v))
assert executor is not None
accum = executor.run()
unique_v = executor.recv_nodes
self._internal_apply_nodes(unique_v, apply_node_func, reduce_accum=accum)
with ir.prog() as prog:
scheduler.schedule_snr(self, (u, v, eid),
message_func, reduce_func, apply_node_func)
Runtime.run(prog)
def pull(self,
v,
......@@ -1294,13 +1237,24 @@ class DGLGraph(object):
apply_node_func : callable, optional
The update function.
"""
if message_func == "default":
message_func = self._message_func
if reduce_func == "default":
reduce_func = self._reduce_func
if apply_node_func == "default":
apply_node_func = self._apply_node_func
assert message_func is not None
assert reduce_func is not None
v = utils.toindex(v)
if len(v) == 0:
return
uu, vv, _ = self._graph.in_edges(v)
self.send_and_recv((uu, vv), message_func, reduce_func, apply_node_func=None)
unique_v = F.unique(v.tousertensor())
self.apply_nodes(apply_node_func, unique_v)
with ir.prog() as prog:
scheduler.schedule_pull(graph=self, v=v,
message_func=message_func, reduce_func=reduce_func,
apply_func=apply_node_func)
Runtime.run(prog)
def push(self,
u,
......@@ -1320,12 +1274,24 @@ class DGLGraph(object):
apply_node_func : callable
The update function.
"""
if message_func == "default":
message_func = self._message_func
if reduce_func == "default":
reduce_func = self._reduce_func
if apply_node_func == "default":
apply_node_func = self._apply_node_func
assert message_func is not None
assert reduce_func is not None
u = utils.toindex(u)
if len(u) == 0:
return
uu, vv, _ = self._graph.out_edges(u)
self.send_and_recv((uu, vv), message_func,
reduce_func, apply_node_func)
with ir.prog() as prog:
scheduler.schedule_push(graph=self, u=u,
message_func=message_func, reduce_func=reduce_func,
apply_func=apply_node_func)
Runtime.run(prog)
def update_all(self,
message_func="default",
......@@ -1346,17 +1312,15 @@ class DGLGraph(object):
message_func = self._message_func
if reduce_func == "default":
reduce_func = self._reduce_func
if apply_node_func == "default":
apply_node_func = self._apply_node_func
assert message_func is not None
assert reduce_func is not None
executor = scheduler.get_executor(
"update_all", self, message_func=message_func, reduce_func=reduce_func)
if executor:
new_reprs = executor.run()
self._internal_apply_nodes(ALL, apply_node_func, reduce_accum=new_reprs)
else:
self.send(ALL, message_func)
self.recv(ALL, reduce_func, apply_node_func)
with ir.prog() as prog:
scheduler.schedule_update_all(graph=self, message_func=message_func,
reduce_func=reduce_func, apply_func=apply_node_func)
Runtime.run(prog)
def prop_nodes(self,
nodes_generator,
......@@ -1534,34 +1498,44 @@ class DGLGraph(object):
Returns
-------
sparse_tensor
SparseTensor
The adjacency matrix.
"""
if not isinstance(transpose, bool):
raise DGLError('Expect bool value for "transpose" arg,'
' but got %s.' % (type(transpose)))
return self._graph.adjacency_matrix(transpose, ctx)
def incidence_matrix(self, oriented=False, ctx=F.cpu()):
def incidence_matrix(self, type, ctx=F.cpu()):
"""Return the incidence matrix representation of this graph.
An incidence matrix is an n x m sparse matrix, where n is
the number of nodes and m is the number of edges. Each nnz
value indicating whether the edge is incident to the node
or not.
There are three types of an incidence matrix `I`:
* "in":
- I[v, e] = 1 if e is the in-edge of v (or v is the dst node of e);
- I[v, e] = 0 otherwise.
* "out":
- I[v, e] = 1 if e is the out-edge of v (or v is the src node of e);
- I[v, e] = 0 otherwise.
* "both":
- I[v, e] = 1 if e is the in-edge of v;
- I[v, e] = -1 if e is the out-edge of v;
- I[v, e] = 0 otherwise (including self-loop).
Parameters
----------
oriented : bool, optional
Whether the returned incidence matrix is oriented.
ctx : optional
type : str
Can be either "in", "out" or "both"
ctx : context, optional (default=cpu)
The context of returned incidence matrix.
Returns
-------
sparse_tensor
SparseTensor
The incidence matrix.
"""
if not isinstance(oriented, bool):
raise DGLError('Expect bool value for "oriented" arg,'
' but got %s.' % (type(oriented)))
return self._graph.incidence_matrix(oriented, ctx)
return self._graph.incidence_matrix(type, ctx)
def line_graph(self, backtracking=True, shared=False):
"""Return the line graph of this graph.
......@@ -1637,33 +1611,3 @@ class DGLGraph(object):
else:
edges = F.tensor(edges)
return edges[e_mask]
def _internal_apply_nodes(self, v, apply_node_func="default", reduce_accum=None,
inplace=False):
"""Internal apply nodes
Parameters
----------
reduce_accum: dict-like
The output of reduce func
"""
if apply_node_func == "default":
apply_node_func = self._apply_node_func
if not apply_node_func:
# Skip none function call.
if reduce_accum is not None:
# write reduce result back
self.set_n_repr(reduce_accum, v, inplace=inplace)
return
# take out current node repr
curr_repr = self.get_n_repr(v)
if reduce_accum is not None:
# merge current node_repr with reduce output
curr_repr = utils.HybridDict(reduce_accum, curr_repr)
nb = NodeBatch(self, v, curr_repr)
new_repr = apply_node_func(nb)
if reduce_accum is not None:
# merge new node_repr with reduce output
reduce_accum.update(new_repr)
new_repr = reduce_accum
self.set_n_repr(new_repr, v, inplace=inplace)
......@@ -474,7 +474,29 @@ class GraphIndex(object):
induced_nodes = utils.toindex(rst(1))
return SubgraphIndex(rst(0), self, induced_nodes, e)
def adjacency_matrix(self, transpose=False, ctx=F.cpu()):
def adjacency_matrix_indices_and_shape(self, transpose=False):
"""Return the indices and dense shape of adjacency matrix representation of
this graph.
utils.CtxCachedObject
An object that returns indices tensor given context.
tuple
Dense shape of the adjacency matrix
"""
if not 'adj_ind_shape' in self._cache:
src, dst, _ = self.edges(sorted=False)
src = F.unsqueeze(src.tousertensor(), 0)
dst = F.unsqueeze(dst.tousertensor(), 0)
n = self.number_of_nodes()
if transpose:
idx = F.cat([src, dst], dim=0)
else:
idx = F.cat([dst, src], dim=0)
cached_idx = utils.CtxCachedObject(lambda ctx: F.copy_to(idx, ctx))
self._cache['adj_ind_shape'] = (cached_idx, (n, n))
return self._cache['adj_ind_shape']
def adjacency_matrix(self, transpose, ctx):
"""Return the adjacency matrix representation of this graph.
By default, a row of returned adjacency matrix represents the destination
......@@ -485,14 +507,19 @@ class GraphIndex(object):
Parameters
----------
transpose : bool, optional (default=False)
transpose : bool
A flag to tranpose the returned adjacency matrix.
ctx : context
The context of the returned matrix.
Returns
-------
SparseTensor
The adjacency matrix.
"""
if not isinstance(transpose, bool):
raise DGLError('Expect bool value for "transpose" arg,'
' but got %s.' % (type(transpose)))
src, dst, _ = self.edges(sorted=False)
src = src.tousertensor(ctx) # the index of the ctx will be cached
dst = dst.tousertensor(ctx) # the index of the ctx will be cached
......@@ -503,18 +530,38 @@ class GraphIndex(object):
else:
idx = F.cat([dst, src], dim=0)
n = self.number_of_nodes()
m = self.number_of_edges()
# FIXME(minjie): data type
dat = F.ones((self.number_of_edges(),), dtype=F.float32, ctx=ctx)
dat = F.ones((m,), dtype=F.float32, ctx=ctx)
adj = F.sparse_matrix(dat, ('coo', idx), (n, n))
return adj
def incidence_matrix(self, oriented=False, ctx=F.cpu()):
def incidence_matrix(self, type, ctx):
"""Return the incidence matrix representation of this graph.
An incidence matrix is an n x m sparse matrix, where n is
the number of nodes and m is the number of edges. Each nnz
value indicating whether the edge is incident to the node
or not.
There are three types of an incidence matrix `I`:
* "in":
- I[v, e] = 1 if e is the in-edge of v (or v is the dst node of e);
- I[v, e] = 0 otherwise.
* "out":
- I[v, e] = 1 if e is the out-edge of v (or v is the src node of e);
- I[v, e] = 0 otherwise.
* "both":
- I[v, e] = 1 if e is the in-edge of v;
- I[v, e] = -1 if e is the out-edge of v;
- I[v, e] = 0 otherwise (including self-loop).
Parameters
----------
oriented : bool, optional (default=False)
Whether the returned incidence matrix is oriented.
type : str
Can be either "in", "out" or "both"
ctx : context
The context of returned incidence matrix.
Returns
-------
......@@ -527,25 +574,36 @@ class GraphIndex(object):
eid = eid.tousertensor(ctx) # the index of the ctx will be cached
n = self.number_of_nodes()
m = self.number_of_edges()
if type == 'in':
row = F.unsqueeze(dst, 0)
col = F.unsqueeze(eid, 0)
idx = F.cat([row, col], dim=0)
# FIXME(minjie): data type
dat = F.ones((m,), dtype=F.float32, ctx=ctx)
inc = F.sparse_matrix(dat, ('coo', idx), (n, m))
elif type == 'out':
row = F.unsqueeze(src, 0)
col = F.unsqueeze(eid, 0)
idx = F.cat([row, col], dim=0)
# FIXME(minjie): data type
dat = F.ones((m,), dtype=F.float32, ctx=ctx)
inc = F.sparse_matrix(dat, ('coo', idx), (n, m))
elif type == 'both':
# create index
row = F.unsqueeze(F.cat([src, dst], dim=0), 0)
col = F.unsqueeze(F.cat([eid, eid], dim=0), 0)
idx = F.cat([row, col], dim=0)
# create data
diagonal = (src == dst)
if oriented:
# FIXME(minjie): data type
x = -F.ones((m,), dtype=F.float32, ctx=ctx)
y = F.ones((m,), dtype=F.float32, ctx=ctx)
x[diagonal] = 0
y[diagonal] = 0
dat = F.cat([x, y], dim=0)
else:
# FIXME(minjie): data type
x = F.ones((m,), dtype=F.float32, ctx=ctx)
x[diagonal] = 0
dat = F.cat([x, x], dim=0)
inc = F.sparse_matrix(dat, ('coo', idx), (n, m))
else:
raise DGLError('Invalid incidence matrix type: %s' % str(type))
return inc
def to_networkx(self):
......
"""Module for common feature initializers."""
from __future__ import absolute_import
from . import backend as F
__all__ = ['base_initializer', 'zero_initializer']
def base_initializer(shape, dtype, ctx, range):
"""The function signature for feature initializer.
Parameters
----------
shape : tuple of int
The shape of the result features. The first dimension
is the batch dimension.
dtype : data type object
The data type of the returned features.
ctx : context object
The device context of the returned features.
range : slice
The start id and the end id of the features to be initialized.
The id could be node or edge id depending on the scenario.
Note that the step is always None.
"""
raise NotImplementedError
def zero_initializer(shape, dtype, ctx, range):
"""Initialize zero-value features."""
return F.zeros(shape, dtype, ctx)
"""DGL Runtime"""
from __future__ import absolute_import
from . import scheduler
from .runtime import Runtime
"""Module for degree bucketing schedulers"""
from __future__ import absolute_import
from .._ffi.function import _init_api
from ..base import is_all, ALL
from .. import backend as F
from ..immutable_graph_index import ImmutableGraphIndex
from ..udf import EdgeBatch, NodeBatch
from .. import utils
from . import ir
from .ir import var as var
def gen_degree_bucketing_schedule(
graph,
reduce_udf,
message_ids,
dst_nodes,
recv_nodes,
var_nf,
var_mf,
var_out):
"""Create degree bucketing schedule.
The messages will be divided by their receivers into buckets. Each bucket
contains nodes that have the same in-degree. The reduce UDF will be applied
on each bucket. The per-bucket result will be merged according to the
*unique-ascending order* of the recv node ids. The order is important to
be compatible with other reduce scheduler such as v2v_spmv.
Parameters
----------
graph : DGLGraph
DGLGraph to use
reduce_udf : callable
The UDF to reduce messages.
message_ids : utils.Index
The variable for message ids.
Invariant: len(message_ids) == len(dst_nodes)
dst_nodes : utils.Index
The variable for dst node of each message.
Invariant: len(message_ids) == len(dst_nodes)
recv_nodes : utils.Index
The unique nodes that perform recv.
Invariant: recv_nodes = sort(unique(dst_nodes))
var_nf : var.FEAT_DICT
The variable for node feature frame.
var_mf : var.FEAT_DICT
The variable for message frame.
var_out : var.FEAT_DICT
The variable for output feature dicts.
"""
buckets = _degree_bucketing_schedule(message_ids, dst_nodes, recv_nodes)
# generate schedule
unique_dst, degs, buckets, msg_ids, zero_deg_nodes = buckets
# loop over each bucket
idx_list = []
fd_list = []
for deg, vb, mid in zip(degs, buckets, msg_ids):
# create per-bkt rfunc
rfunc = _create_per_bkt_rfunc(graph, reduce_udf, deg, vb)
# vars
vb = var.IDX(vb)
mid = var.IDX(mid)
rfunc = var.FUNC(rfunc)
# recv on each bucket
fdvb = ir.READ_ROW(var_nf, vb)
fdmail = ir.READ_ROW(var_mf, mid)
fdvb = ir.NODE_UDF(rfunc, fdvb, fdmail, ret=fdvb) # reuse var
# save for merge
idx_list.append(vb)
fd_list.append(fdvb)
# merge buckets according to the ascending order of the node ids.
all_idx = F.cat([idx.data.tousertensor() for idx in idx_list], dim=0)
sorted_idx, order = F.sort_1d(all_idx)
var_sorted_idx = var.IDX(utils.toindex(sorted_idx))
var_order = var.IDX(utils.toindex(order))
reduced_feat = ir.MERGE_ROW(var_order, fd_list)
if zero_deg_nodes is not None:
# If has zero degrees, scatter the result back to the frame. As
# a result, the features for zero degree nodes will be initialized
# correctly.
ir.WRITE_ROW_(var_out, var_sorted_idx, reduced_feat)
else:
ir.WRITE_DICT_(var_out, reduced_feat)
def _degree_bucketing_schedule(mids, dsts, v):
"""Return the bucketing by degree scheduling for destination nodes of
messages
Parameters
----------
mids: utils.Index
edge id for each message
dsts: utils.Index
destination node for each message
v: utils.Index
all receiving nodes (for checking zero degree nodes)
"""
buckets = _CAPI_DGLDegreeBucketing(mids.todgltensor(), dsts.todgltensor(),
v.todgltensor())
return _process_buckets(buckets)
def _degree_bucketing_for_edges(dsts):
"""Return the bucketing by degree scheduling for destination nodes of
messages
Parameters
----------
dsts: utils.Index
destination node for each message
"""
buckets = _CAPI_DGLDegreeBucketingForEdges(dsts.todgltensor())
return _process_buckets(buckets)
def _degree_bucketing_for_graph(graph, v):
"""Return the bucketing by degree scheduling given graph index and optional
dst nodes
Parameters:
-----------
graph: GraphIndex
DGLGraph Index (update all case) or message graph index (recv cases)
v: utils.Index
Destination nodes (recv cases)
"""
if is_all(v):
buckets = _CAPI_DGLDegreeBucketingForFullGraph(graph._handle)
else:
buckets = _CAPI_DGLDegreeBucketingForRecvNodes(graph._handle,
v.todgltensor())
return _process_buckets(buckets)
def _process_buckets(buckets):
"""read bucketing auxiliary data
Returns
-------
unique_v: utils.Index
unqiue destination nodes
degrees: numpy.ndarray
A list of degree for each bucket
v_bkt: list of utils.Index
A list of node id buckets, nodes in each bucket have the same degree
msg_ids: list of utils.Index
A list of message id buckets, each node in the ith node id bucket has
degree[i] messages in the ith message id bucket
zero_deg_nodes : utils.Index
The zero-degree nodes
"""
# get back results
degs = utils.toindex(buckets(0))
v = utils.toindex(buckets(1))
# XXX: convert directly from ndarary to python list?
v_section = buckets(2).asnumpy().tolist()
msg_ids = utils.toindex(buckets(3))
msg_section = buckets(4).asnumpy().tolist()
# split buckets
msg_ids = msg_ids.tousertensor()
dsts = F.split(v.tousertensor(), v_section, 0)
msg_ids = F.split(msg_ids, msg_section, 0)
# convert to utils.Index
dsts = [utils.toindex(dst) for dst in dsts]
msg_ids = [utils.toindex(msg_id) for msg_id in msg_ids]
# handle zero deg
degs = degs.tolist()
if degs[-1] == 0:
degs = degs[:-1]
zero_deg_nodes = dsts[-1]
dsts = dsts[:-1]
else:
zero_deg_nodes = None
return v, degs, dsts, msg_ids, zero_deg_nodes
def _create_per_bkt_rfunc(graph, reduce_udf, deg, vb):
def _rfunc_wrapper(node_data, mail_data):
def _reshaped_getter(key):
msg = mail_data[key]
new_shape = (len(vb), deg) + F.shape(msg)[1:]
return F.reshape(msg, new_shape)
reshaped_mail_data = utils.LazyDict(_reshaped_getter, mail_data.keys())
nb = NodeBatch(graph, vb, node_data, reshaped_mail_data)
return reduce_udf(nb)
return _rfunc_wrapper
_init_api("dgl.runtime.degree_bucketing")
from .executor import *
from .program import get_current_prog, prog
from __future__ import absolute_import
from abc import abstractmethod
from ... import backend as F
from ...frame import FrameRef, Frame
from ... import utils
from .program import get_current_prog
from . import var
from .var import VarType
from .registry import IR_REGISTRY
class OpCode(object):
# immutable op
NODE_UDF = 0
EDGE_UDF = 1
SPMV = 2
SPMV_WITH_DATA = 3
READ = 4
READ_COL = 5
READ_ROW = 6
MERGE_ROW = 7
UPDATE_DICT = 8
# mutable op (no return)
# remember the name is suffixed with "_"
WRITE_ = 21
WRITE_COL_ = 22
WRITE_ROW_ = 23
WRITE_DICT_ = 24
APPEND_ROW_ = 25
class Executor(object):
@abstractmethod
def opcode(self):
raise NotImplementedError
@abstractmethod
def arg_vars(self):
raise NotImplementedError
@abstractmethod
def ret_var(self):
raise NotImplementedError
@abstractmethod
def run(self):
raise NotImplementedError
class NodeUDFExecutor(Executor):
def __init__(self, fn, fdnode, fdmail, ret):
self.fn = fn
self.fdnode = fdnode
self.fdmail = fdmail
self.ret = ret
def opcode(self):
return OpCode.NODE_UDF
def arg_vars(self):
if self.fdmail is None:
return [self.fn, self.fdnode]
else:
return [self.fn, self.fdnode, self.fdmail]
def ret_var(self):
return self.ret
def run(self):
fn_data = self.fn.data
node_data = self.fdnode.data
if self.fdmail is None:
udf_ret = fn_data(node_data)
else:
mail_data = self.fdmail.data
udf_ret = fn_data(node_data, mail_data)
self.ret.data = FrameRef(Frame(udf_ret))
IR_REGISTRY[OpCode.NODE_UDF] = {
'name' : 'NODE_UDF',
'args_type' : [VarType.FUNC, VarType.FEAT_DICT, VarType.FEAT_DICT],
'ret_type' : VarType.FEAT_DICT,
'executor_cls' : NodeUDFExecutor,
}
def NODE_UDF(fn, fdnode, fdmail=None, ret=None):
reg = IR_REGISTRY[OpCode.NODE_UDF]
ret = var.new(reg['ret_type']) if ret is None else ret
get_current_prog().issue(reg['executor_cls'](fn, fdnode, fdmail, ret))
return ret
class EdgeUDFExecutor(Executor):
def __init__(self, fn, fdsrc, fdedge, fddst, ret):
self.fn = fn
self.fdsrc = fdsrc
self.fdedge = fdedge
self.fddst = fddst
self.ret = ret
def opcode(self):
return OpCode.EDGE_UDF
def arg_vars(self):
return [self.fn, self.fdsrc, self.fdedge, self.fddst]
def ret_var(self):
return self.ret
def run(self):
fn_data = self.fn.data
src_data = self.fdsrc.data
edge_data = self.fdedge.data
dst_data = self.fddst.data
udf_ret = fn_data(src_data, edge_data, dst_data)
self.ret.data = FrameRef(Frame(udf_ret))
IR_REGISTRY[OpCode.EDGE_UDF] = {
'name' : 'EDGE_UDF',
'args_type' : [VarType.FUNC, VarType.FEAT_DICT, VarType.FEAT_DICT],
'ret_type' : VarType.FEAT_DICT,
'executor_cls' : EdgeUDFExecutor,
}
def EDGE_UDF(fn, fdsrc, fdedge, fddst, ret=None):
reg = IR_REGISTRY[OpCode.EDGE_UDF]
ret = var.new(reg['ret_type']) if ret is None else ret
get_current_prog().issue(reg['executor_cls'](fn, fdsrc, fdedge, fddst, ret))
return ret
class ReadExecutor(Executor):
def __init__(self, fd, row, col, ret):
self.fd = fd
self.row = row
self.col = col
self.ret = ret
def opcode(self):
return OpCode.READ
def arg_vars(self):
return [self.fd, self.row, self.col]
def ret_var(self):
return self.ret
def run(self):
fd_data = self.fd.data # feature dict
row_data = self.row.data # idx
col_data = self.col.data # key str
self.ret.data = fd_data[row_data][col_data]
IR_REGISTRY[OpCode.READ] = {
'name' : 'READ',
'args_type' : [VarType.FEAT_DICT, VarType.IDX, VarType.STR],
'ret_type' : VarType.FEAT,
'executor_cls' : ReadExecutor,
}
def READ(fd, row, col, ret=None):
reg = IR_REGISTRY[OpCode.READ]
ret = var.new(reg['ret_type']) if ret is None else ret
get_current_prog().issue(reg['executor_cls'](fd, row, col, ret))
return ret
class ReadColExecutor(Executor):
def __init__(self, fd, col, ret):
self.fd = fd
self.col = col
self.ret = ret
def opcode(self):
return OpCode.READ_COL
def arg_vars(self):
return [self.fd, self.col]
def ret_var(self):
return self.ret
def run(self):
fd_data = self.fd.data
col_data = self.col.data
self.ret.data = fd_data[col_data]
IR_REGISTRY[OpCode.READ_COL] = {
'name' : 'READ_COL',
'args_type' : [VarType.FEAT_DICT, VarType.STR],
'ret_type' : VarType.FEAT,
'executor_cls' : ReadColExecutor,
}
def READ_COL(fd, col, ret=None):
reg = IR_REGISTRY[OpCode.READ_COL]
ret = var.new(reg['ret_type']) if ret is None else ret
get_current_prog().issue(reg['executor_cls'](fd, col, ret))
return ret
class ReadRowExecutor(Executor):
def __init__(self, fd, row, ret):
self.fd = fd
self.row = row
self.ret = ret
def opcode(self):
return OpCode.READ_ROW
def arg_vars(self):
return [self.fd, self.row]
def ret_var(self):
return self.ret
def run(self):
fd_data = self.fd.data
row_data = self.row.data # idx
self.ret.data = fd_data[row_data]
IR_REGISTRY[OpCode.READ_ROW] = {
'name' : 'READ_ROW',
'args_type' : [VarType.FEAT_DICT, VarType.IDX],
'ret_type' : VarType.FEAT_DICT,
'executor_cls' : ReadRowExecutor,
}
def READ_ROW(fd, row, ret=None):
reg = IR_REGISTRY[OpCode.READ_ROW]
ret = var.new(reg['ret_type']) if ret is None else ret
get_current_prog().issue(reg['executor_cls'](fd, row, ret))
return ret
class SPMVExecutor(Executor):
def __init__(self, spA, B, ret):
self.spA = spA
self.B = B
self.ret = ret
def opcode(self):
return OpCode.SPMV
def arg_vars(self):
return [self.spA, self.B]
def ret_var(self):
return self.ret
def run(self):
spA_ctxobj = self.spA.data
B = self.B.data
ctx = F.context(B)
spA = spA_ctxobj.get(ctx)
if F.ndim(B) == 1:
# B is a vector, append a (1,) dim at the end
B = F.unsqueeze(B, 1)
C = F.spmm(spA, B)
C = F.squeeze(C, 1)
else:
C = F.spmm(spA, B)
self.ret.data = C
IR_REGISTRY[OpCode.SPMV] = {
'name' : 'SPMV',
'args_type' : [VarType.SPMAT, VarType.FEAT],
'ret_type' : VarType.FEAT,
'executor_cls' : SPMVExecutor,
}
def SPMV(spA, B, ret=None):
reg = IR_REGISTRY[OpCode.SPMV]
ret = var.new(reg['ret_type']) if ret is None else ret
get_current_prog().issue(reg['executor_cls'](spA, B, ret))
return ret
class SPMVWithDataExecutor(Executor):
def __init__(self, spA, A_data, B, ret):
self.spA = spA
self.A_data = A_data
self.B = B
self.ret = ret
def opcode(self):
return OpCode.SPMV_WITH_DATA
def arg_vars(self):
return [self.spA, self.A_data, self.B]
def ret_var(self):
return self.ret
def run(self):
spA_ctxobj = self.spA.data
A_data = self.A_data.data
if F.ndim(A_data) > 1:
# A_data is of shape (E, 1). Squeeze the last dim.
A_data = F.squeeze(A_data, 1)
B = self.B.data
ctx = F.context(B)
spA = spA_ctxobj.get(ctx)
spidx = F.sparse_matrix_indices(spA)
shape = F.shape(spA)
spA = F.sparse_matrix(A_data, spidx, shape)
if F.ndim(B) == 1:
# B is a vector, append a (1,) dim at the end
B = F.unsqueeze(B, 1)
C = F.spmm(spA, B)
C = F.squeeze(C, 1)
else:
C = F.spmm(spA, B)
self.ret.data = C
IR_REGISTRY[OpCode.SPMV_WITH_DATA] = {
'name' : 'SPMV_WITH_DATA',
'args_type' : [VarType.SPMAT, VarType.FEAT, VarType.FEAT],
'ret_type' : VarType.FEAT,
'executor_cls' : SPMVWithDataExecutor,
}
def SPMV_WITH_DATA(spA, A_data, B, ret=None):
reg = IR_REGISTRY[OpCode.SPMV_WITH_DATA]
ret = var.new(reg['ret_type']) if ret is None else ret
get_current_prog().issue(reg['executor_cls'](spA, A_data, B, ret))
return ret
class MergeRowExecutor(Executor):
def __init__(self, order, fd_list, ret):
self.order = order
self.fd_list = fd_list
self.ret = ret
def opcode(self):
return OpCode.MERGE_ROW
def arg_vars(self):
return [self.order] + self.fd_list
def ret_var(self):
return self.ret
def run(self):
# merge buckets according to the ascending order of the node ids.
order_data = self.order.data
fd_data = [fd.data for fd in self.fd_list]
keys = fd_data[0].keys()
all_fd = {key : F.cat([fd[key] for fd in fd_data], dim=0)
for key in keys}
ret_fd = utils.reorder(all_fd, order_data)
self.ret.data = ret_fd
IR_REGISTRY[OpCode.MERGE_ROW] = {
'name' : 'MERGE_ROW',
'args_type' : [VarType.IDX, VarType.IDX, '*', VarType.FEAT_DICT, '*'],
'ret_type' : VarType.FEAT_DICT,
'executor_cls' : MergeRowExecutor,
}
def MERGE_ROW(idx_list, fd_list, ret=None):
reg = IR_REGISTRY[OpCode.MERGE_ROW]
ret = var.new(reg['ret_type']) if ret is None else ret
get_current_prog().issue(reg['executor_cls'](idx_list, fd_list, ret))
return ret
class UpdateDictExecutor(Executor):
def __init__(self, fd1, fd2, ret):
self.fd1 = fd1
self.fd2 = fd2
self.ret = ret
def opcode(self):
return OpCode.UPDATE_DICT
def arg_vars(self):
return [self.fd1, self.fd2]
def ret_var(self):
return self.ret
def run(self):
fd1_data = self.fd1.data
fd2_data = self.fd2.data
if (isinstance(fd1_data, utils.LazyDict)
or isinstance(fd2_data, utils.LazyDict)):
# NOTE: fd2 has higher priority
ret_data = utils.HybridDict(fd2_data, fd1_data)
else:
ret_data = {k : v for k, v in fd1_data.items()}
ret_data.update(fd2_data)
self.ret.data = ret_data
IR_REGISTRY[OpCode.UPDATE_DICT] = {
'name' : 'UPDATE_DICT',
'args_type' : [VarType.FEAT_DICT, VarType.FEAT_DICT],
'ret_type' : VarType.FEAT_DICT,
'executor_cls' : UpdateDictExecutor,
}
def UPDATE_DICT(fd1, fd2, ret=None):
reg = IR_REGISTRY[OpCode.UPDATE_DICT]
ret = var.new(reg['ret_type']) if ret is None else ret
get_current_prog().issue(reg['executor_cls'](fd1, fd2, ret))
return ret
class Write_Executor(Executor):
def __init__(self, fd, row, col, val):
self.fd = fd
self.row = row
self.col = col
self.val = val
def opcode(self):
return OpCode.WRITE_
def arg_vars(self):
return [self.fd, self.row, self.col, self.val]
def ret_var(self):
return None
def run(self):
fd_data = self.fd.data # feature dict
row_data = self.row.data # idx
col_data = self.col.data # key str
val_data = self.val.data
fd_data[col_data][row_data] = val_data
IR_REGISTRY[OpCode.WRITE_] = {
'name' : 'WRITE_',
'args_type' : [VarType.FEAT_DICT, VarType.IDX, VarType.STR, VarType.FEAT],
'ret_type' : None,
'executor_cls' : Write_Executor,
}
def WRITE_(fd, row, col, val):
reg = IR_REGISTRY[OpCode.WRITE_]
get_current_prog().issue(reg['executor_cls'](fd, row, col, val))
class WriteCol_Executor(Executor):
def __init__(self, fd, col, val):
self.fd = fd
self.col = col
self.val = val
def opcode(self):
return OpCode.WRITE_COL_
def arg_vars(self):
return [self.fd, self.col, self.val]
def ret_var(self):
return None
def run(self):
fd_data = self.fd.data # feature dict
col_data = self.col.data # key str
val_data = self.val.data
fd_data[col_data] = val_data
IR_REGISTRY[OpCode.WRITE_COL_] = {
'name' : 'WRITE_COL_',
'args_type' : [VarType.FEAT_DICT, VarType.STR, VarType.FEAT],
'ret_type' : None,
'executor_cls' : WriteCol_Executor,
}
def WRITE_COL_(fd, col, val):
reg = IR_REGISTRY[OpCode.WRITE_COL_]
get_current_prog().issue(reg['executor_cls'](fd, col, val))
class WriteRow_Executor(Executor):
def __init__(self, fd, row, val):
self.fd = fd
self.row = row
self.val = val
def opcode(self):
return OpCode.WRITE_ROW_
def arg_vars(self):
return [self.fd, self.row, self.val]
def ret_var(self):
return None
def run(self):
fd_data = self.fd.data # feature dict
row_data = self.row.data # idx
val_data = self.val.data
fd_data[row_data] = val_data
IR_REGISTRY[OpCode.WRITE_ROW_] = {
'name' : 'WRITE_ROW_',
'args_type' : [VarType.FEAT_DICT, VarType.IDX, VarType.FEAT_DICT],
'ret_type' : None,
'executor_cls' : WriteRow_Executor,
}
def WRITE_ROW_(fd, row, val):
reg = IR_REGISTRY[OpCode.WRITE_ROW_]
get_current_prog().issue(reg['executor_cls'](fd, row, val))
class WriteDict_Executor(Executor):
def __init__(self, fd1, fd2):
self.fd1 = fd1
self.fd2 = fd2
def opcode(self):
return OpCode.WRITE_DICT_
def arg_vars(self):
return [self.fd1, self.fd2]
def ret_var(self):
return None
def run(self):
fd1_data = self.fd1.data
fd2_data = self.fd2.data
for k, v in fd2_data.items():
fd1_data[k] = v
IR_REGISTRY[OpCode.WRITE_DICT_] = {
'name' : 'WRITE_DICT_',
'args_type' : [VarType.FEAT_DICT, VarType.FEAT_DICT],
'ret_type' : None,
'executor_cls' : WriteDict_Executor,
}
def WRITE_DICT_(fd1, fd2):
reg = IR_REGISTRY[OpCode.WRITE_DICT_]
get_current_prog().issue(reg['executor_cls'](fd1, fd2))
class AppendRow_Executor(Executor):
def __init__(self, fd1, fd2):
self.fd1 = fd1
self.fd2 = fd2
def opcode(self):
return OpCode.APPEND_ROW_
def arg_vars(self):
return [self.fd1, self.fd2]
def ret_var(self):
return None
def run(self):
fd1_data = self.fd1.data
fd2_data = self.fd2.data
fd1_data.append(fd2_data)
IR_REGISTRY[OpCode.APPEND_ROW_] = {
'name' : 'APPEND_ROW_',
'args_type' : [VarType.FEAT_DICT, VarType.FEAT_DICT],
'ret_type' : None,
'executor_cls' : AppendRow_Executor,
}
def APPEND_ROW_(fd1, fd2):
reg = IR_REGISTRY[OpCode.APPEND_ROW_]
get_current_prog().issue(reg['executor_cls'](fd1, fd2))
from __future__ import absolute_import
from contextlib import contextmanager
from .registry import IR_REGISTRY
class Prog(object):
"""The program."""
def __init__(self):
self.execs = []
self.varcount = 0
def issue(self, exe):
self.execs.append(exe)
def pprint_exe(self, exe):
argstr = ', '.join([str(av) for av in exe.arg_vars()])
if exe.ret_var() is None:
# stmt
print("%s(%s)" % (
IR_REGISTRY[exe.opcode()]['name'],
argstr))
else:
print("%s %s = %s(%s)" % (
exe.ret_var().typestr(),
exe.ret.name,
IR_REGISTRY[exe.opcode()]['name'],
argstr))
def pprint(self):
for exe in self.execs:
self.pprint_exe(exe)
_current_prog = None
def get_current_prog():
global _current_prog
return _current_prog
def set_current_prog(prog):
global _current_prog
_current_prog = prog
@contextmanager
def prog():
set_current_prog(Prog())
yield get_current_prog()
set_current_prog(None)
"""Module for ir registry."""
from __future__ import absolute_import
IR_REGISTRY = {}
from __future__ import absolute_import
from .program import get_current_prog
class VarType(object):
# Types for symbolic objects (i.e, they might not be
# concretized before evaluation.
FEAT = 0
FEAT_DICT = 1
# Types for concrete objects (i.e, they must have values).
SPMAT = 2
IDX = 3
STR = 4
FUNC = 5
VAR_TYPE_NAME_MAP = [
'Feat',
'FeatDict',
'SpMat',
'Idx',
'Str',
'Func',
]
class Var(object):
"""Variable
name : str
type : int
data : any, default=None (not concretized)
"""
__slots__ = ['name', 'type', 'data']
def __init__(self, name, type, data):
self.name = name
self.type = type
self.data = data
def __str__(self):
if self.type == VarType.STR:
return '"%s"' % self.data
else:
return self.name
def typestr(self):
return VAR_TYPE_NAME_MAP[self.type]
def new(type, data=None, name=None):
if name is None:
cur_prog = get_current_prog()
name = '_z%d' % cur_prog.varcount
cur_prog.varcount += 1
return Var(name, type, data)
def FEAT(data=None, name=None):
return new(VarType.FEAT, data, name)
def FEAT_DICT(data=None, name=None):
return new(VarType.FEAT_DICT, data, name)
def SPMAT(data=None, name=None):
return new(VarType.SPMAT, data, name)
def IDX(data=None, name=None):
return new(VarType.IDX, data, name)
def STR(data=None, name=None):
return new(VarType.STR, data, name)
def FUNC(data=None, name=None):
return new(VarType.FUNC, data, name)
"""DGL mini-runtime."""
class Runtime(object):
@staticmethod
def run(prog):
for exe in prog.execs:
#prog.pprint_exe(exe)
exe.run()
"""For different schedulers"""
from __future__ import absolute_import
from .. import utils
from .._ffi.function import _init_api
from ..base import ALL, DGLError, is_all
from .. import backend as F
from ..frame import frame_like, FrameRef
from ..function.base import BuiltinFunction, BundledFunction
from ..udf import EdgeBatch, NodeBatch
from . import ir
from .ir import var as var
from . import degree_bucketing as db
from . import spmv
__all__ = [
"schedule_send",
"schedule_recv",
"schedule_update_all",
"schedule_snr",
"schedule_apply_nodes",
"schedule_apply_edges",
"schedule_push",
"schedule_pull"
]
def schedule_send(graph, u, v, eid, message_func):
"""get send schedule
Parameters
----------
graph: DGLGraph
The DGLGraph to use
u : utils.Index
Source nodes
v : utils.Index
Destination nodes
eid : utils.Index
Ids of sending edges
message_func: callable or list of callable
The message function
"""
# vars
nf = var.FEAT_DICT(graph._node_frame)
ef = var.FEAT_DICT(graph._edge_frame)
mf = var.FEAT_DICT(graph._msg_frame)
u = var.IDX(u)
v = var.IDX(v)
eid = var.IDX(eid)
msg = _gen_send(graph, nf, ef, u, v, eid, message_func)
# TODO: handle duplicate messages
ir.APPEND_ROW_(mf, msg)
def schedule_recv(graph, recv_nodes, reduce_func, apply_func):
"""Schedule recv.
Parameters
----------
graph: DGLGraph
The DGLGraph to use
v : utils.Index
Nodes to recv.
reduce_func: callable or list of callable
The reduce function
apply_func: callable
The apply node function
"""
nf = var.FEAT_DICT(graph._node_frame, name='nf')
# sort and unique the argument
recv_nodes, _ = F.sort_1d(F.unique(recv_nodes.tousertensor()))
recv_nodes = utils.toindex(recv_nodes)
reduced_feat = _gen_reduce(graph, reduce_func, recv_nodes)
var_recv_nodes = var.IDX(recv_nodes, name='recv_nodes')
if apply_func:
# To avoid writing reduced features back to node frame and reading
# it again for apply phase. Instead, we first read the the node
# features and "merge" it with the reduced features.
v_nf = ir.READ_ROW(nf, var_recv_nodes)
v_nf = ir.UPDATE_DICT(v_nf, reduced_feat)
def _afunc_wrapper(node_data):
nb = NodeBatch(graph, recv_nodes, node_data)
return apply_func(nb)
afunc = var.FUNC(_afunc_wrapper)
applied_feat = ir.NODE_UDF(afunc, v_nf)
final_feat = ir.UPDATE_DICT(reduced_feat, applied_feat)
else:
final_feat = reduced_feat
ir.WRITE_ROW_(nf, var_recv_nodes, final_feat)
def _gen_reduce(graph, reduce_func, recv_nodes):
"""
graph : DGLGraph
reduce_func : callable
recv_nodes : utils.Index
"""
call_type = "recv"
_, dst, mid = graph._msg_graph.in_edges(recv_nodes)
rfunc = _standardize_func_usage(reduce_func)
rfunc_is_list = utils.is_iterable(rfunc)
# Create a tmp frame to hold the feature data.
# The frame has the same size and schemes of the
# node frame.
# TODO(minjie): should replace this with an IR call to make the program stateless.
tmpframe = FrameRef(frame_like(graph._node_frame._frame, len(recv_nodes)))
# vars
msg = var.FEAT_DICT(graph._msg_frame, 'msg')
nf = var.FEAT_DICT(graph._node_frame, 'nf')
out = var.FEAT_DICT(data=tmpframe)
if rfunc_is_list:
# UDF message + builtin reducer
# analyze e2v spmv
spmv_rfunc, rfunc = spmv.analyze_e2v_spmv(graph, rfunc)
inc = spmv.build_inc_matrix(call_type, graph, mid, dst)
spmv.gen_e2v_spmv_schedule(inc, spmv_rfunc, msg, out)
if len(rfunc) == 0:
# All mfunc and rfunc has been processed.
return out
# convert the remaining rfunc to UDFs
rfunc = BundledFunction(rfunc)
# gen degree bucketing schedule for UDF recv
db.gen_degree_bucketing_schedule(graph, rfunc, mid, dst,
recv_nodes, nf, msg, out)
return out
def schedule_snr(graph,
edge_tuples,
message_func,
reduce_func,
apply_func):
call_type = 'send_and_recv'
u, v, eid = edge_tuples
recv_nodes, _ = F.sort_1d(F.unique(v.tousertensor()))
recv_nodes = utils.toindex(recv_nodes)
# create vars
var_nf = var.FEAT_DICT(graph._node_frame, name='nf')
var_u = var.IDX(u)
var_v = var.IDX(v)
var_eid = var.IDX(eid)
var_recv_nodes = var.IDX(recv_nodes, name='recv_nodes')
# generate send and reduce schedule
reduced_feat = _gen_send_reduce(call_type, graph,
message_func, reduce_func, (var_u, var_v, var_eid), recv_nodes)
# generate apply schedule
if apply_func:
# To avoid writing reduced features back to node frame and reading
# it again for apply phase. Instead, we first read the the node
# features and "merge" it with the reduced features.
v_nf = ir.READ_ROW(var_nf, var_recv_nodes)
v_nf = ir.UPDATE_DICT(v_nf, reduced_feat)
def _afunc_wrapper(node_data):
nb = NodeBatch(graph, recv_nodes, node_data)
return apply_func(nb)
afunc = var.FUNC(_afunc_wrapper)
applied_feat = ir.NODE_UDF(afunc, v_nf)
final_feat = ir.UPDATE_DICT(reduced_feat, applied_feat)
else:
final_feat = reduced_feat
ir.WRITE_ROW_(var_nf, var_recv_nodes, final_feat)
def _gen_send_reduce(
call_type,
graph,
message_func,
reduce_func,
edge_tuples,
recv_nodes):
"""Generate send and reduce schedule.
This guarantees that the returned reduced features are batched
in the *unique-ascending* order of the edge destination node ids.
call_type : str
graph : DGLGraph
message_func : callable, list of builtins
reduce_func : callable, list of builtins
edge_tuples : (u, v, eid) tuple of var.Var
recv_nodes : utils.index
"""
# arg vars
var_u, var_v, var_eid = edge_tuples
var_nf = var.FEAT_DICT(graph._node_frame, name='nf')
var_ef = var.FEAT_DICT(graph._edge_frame, name='ef')
# format the input functions
mfunc = _standardize_func_usage(message_func)
rfunc = _standardize_func_usage(reduce_func)
mfunc_is_list = utils.is_iterable(mfunc)
rfunc_is_list = utils.is_iterable(rfunc)
# Create a tmp frame to hold the feature data.
# The frame has the same size and schemes of the
# node frame.
# TODO(minjie): should replace this with an IR call to make the program stateless.
tmpframe = FrameRef(frame_like(graph._node_frame._frame, len(recv_nodes)))
var_out = var.FEAT_DICT(data=tmpframe)
if mfunc_is_list and rfunc_is_list:
# builtin message + builtin reducer
# analyze v2v spmv
spmv_pairs, mfunc, rfunc = spmv.analyze_v2v_spmv(graph, mfunc, rfunc)
adj = spmv.build_adj_matrix(call_type, graph, var_u.data, var_v.data)
spmv.gen_v2v_spmv_schedule(adj, spmv_pairs, var_nf, var_ef, var_eid, var_out)
if len(mfunc) == 0:
# All mfunc and rfunc have been converted to v2v spmv.
return var_out
if mfunc_is_list:
# Two cases:
# - mfunc is builtin while rfunc is UDF.
# - mfunc and rfunc are both builtin but some combinations
# fall through from the v2v spmv analysis.
# In both cases, convert the mfunc to UDF.
mfunc = BundledFunction(mfunc)
# generate UDF send schedule
var_mf = _gen_send(graph, var_nf, var_ef, var_u, var_v, var_eid, mfunc)
if rfunc_is_list:
# UDF message + builtin reducer
# analyze e2v spmv
spmv_rfunc, rfunc = spmv.analyze_e2v_spmv(graph, rfunc)
inc = spmv.build_inc_matrix(call_type, graph, var_eid.data, var_v.data)
spmv.gen_e2v_spmv_schedule(inc, spmv_rfunc, var_mf, var_out)
if len(rfunc) == 0:
# All mfunc and rfunc has been processed.
return var_out
# convert the remaining rfunc to UDFs
rfunc = BundledFunction(rfunc)
# gen degree bucketing schedule for UDF recv
mid = utils.toindex(slice(0, len(var_v.data))) # message id is from 0~|dst|
db.gen_degree_bucketing_schedule(graph, rfunc,
mid, var_v.data, recv_nodes,
var_nf, var_mf, var_out)
return var_out
def _gen_send(graph, nf, ef, u, v, eid, mfunc):
fdsrc = ir.READ_ROW(nf, u)
fddst = ir.READ_ROW(nf, v)
fdedge = ir.READ_ROW(ef, eid)
def _mfunc_wrapper(src_data, edge_data, dst_data):
eb = EdgeBatch(graph, (u.data, v.data, eid.data),
src_data, edge_data, dst_data)
return mfunc(eb)
_mfunc_wrapper = var.FUNC(_mfunc_wrapper)
msg = ir.EDGE_UDF(_mfunc_wrapper, fdsrc, fdedge, fddst)
return msg
def schedule_update_all(graph, message_func, reduce_func, apply_func):
"""get send and recv schedule
Parameters
----------
graph: DGLGraph
The DGLGraph to use
message_func: callable or list of callable
The message function
reduce_func: callable or list of callable
The reduce function
apply_func: callable
The apply node function
"""
call_type = 'update_all'
src, dst, _ = graph._graph.edges()
eid = utils.toindex(slice(0, graph.number_of_edges())) # shortcut for ALL
recv_nodes = utils.toindex(slice(0, graph.number_of_nodes())) # shortcut for ALL
# create vars
var_nf = var.FEAT_DICT(graph._node_frame, name='nf')
var_recv_nodes = var.IDX(recv_nodes, name='recv_nodes')
var_src = var.IDX(src)
var_dst = var.IDX(dst)
var_eid = var.IDX(eid)
# generate send + reduce
reduced_feat = _gen_send_reduce(call_type, graph,
message_func, reduce_func, (var_src, var_dst, var_eid), recv_nodes)
# generate optional apply
if apply_func:
# To avoid writing reduced features back to node frame and reading
# it again for apply phase. Instead, we first read the the node
# features and "merge" it with the reduced features.
v_nf = ir.READ_ROW(var_nf, var_recv_nodes)
v_nf = ir.UPDATE_DICT(v_nf, reduced_feat)
def _afunc_wrapper(node_data):
nb = NodeBatch(graph, recv_nodes, node_data)
return apply_func(nb)
afunc = var.FUNC(_afunc_wrapper)
applied_feat = ir.NODE_UDF(afunc, v_nf)
final_feat = ir.UPDATE_DICT(reduced_feat, applied_feat)
else:
final_feat = reduced_feat
ir.WRITE_DICT_(var_nf, final_feat)
def schedule_apply_nodes(graph, v, apply_func):
"""get apply nodes schedule
Parameters
----------
graph: DGLGraph
The DGLGraph to use
v : utils.Index
Nodes to apply
apply_func: callable
The apply node function
Returns
-------
A list of executors for DGL Runtime
"""
var_nf = var.FEAT_DICT(graph._node_frame, name='nf')
var_v = var.IDX(v)
v_nf = ir.READ_ROW(var_nf, var_v)
def _afunc_wrapper(node_data):
nb = NodeBatch(graph, v, node_data)
return apply_func(nb)
afunc = var.FUNC(_afunc_wrapper)
applied_feat = ir.NODE_UDF(afunc, v_nf)
ir.WRITE_ROW_(var_nf, var_v, applied_feat)
def schedule_apply_edges(graph, u, v, eid, apply_func):
"""get apply edges schedule
Parameters
----------
graph: DGLGraph
The DGLGraph to use
u : utils.Index
Source nodes of edges to apply
v : utils.Index
Destination nodes of edges to apply
eid : utils.Index
Ids of sending edges
apply_func: callable
The apply edge function
Returns
-------
A list of executors for DGL Runtime
"""
# vars
var_nf = var.FEAT_DICT(graph._node_frame, name='nf')
var_ef = var.FEAT_DICT(graph._edge_frame, name='ef')
var_u = var.IDX(u)
var_v = var.IDX(v)
var_eid = var.IDX(eid)
# schedule apply edges
fdsrc = ir.READ_ROW(var_nf, var_u)
fddst = ir.READ_ROW(var_nf, var_v)
fdedge = ir.READ_ROW(var_ef, var_eid)
def _efunc_wrapper(src_data, edge_data, dst_data):
eb = EdgeBatch(graph, (u, v, eid),
src_data, edge_data, dst_data)
return apply_func(eb)
_efunc = var.FUNC(_efunc_wrapper)
new_fdedge = ir.EDGE_UDF(_efunc, fdsrc, fdedge, fddst)
ir.WRITE_ROW_(var_ef, var_eid, new_fdedge)
def schedule_push(graph, u, message_func, reduce_func, apply_func):
"""get push schedule
Parameters
----------
graph: DGLGraph
The DGLGraph to use
u : utils.Index
Source nodes for push
message_func: callable or list of callable
The message function
reduce_func: callable or list of callable
The reduce function
apply_func: callable
The apply node function
Returns
-------
A list of executors for DGL Runtime
"""
# FIXME: for now, use send_and_recv to implement push
u, v, eid = graph._graph.out_edges(u)
if len(eid) == 0:
return []
schedule_snr(graph, (u, v, eid), message_func, reduce_func, apply_func)
def schedule_pull(graph, v, message_func, reduce_func, apply_func):
"""get pull schedule
Parameters
----------
graph: DGLGraph
The DGLGraph to use
v : utils.Index
Destination nodes for pull
message_func: callable or list of callable
The message function
reduce_func: callable or list of callable
The reduce function
apply_func: callable
The apply node function
Returns
-------
A list of executors for DGL Runtime
"""
# FIXME: for now, use send_and_recv to implement pull
u, v, eid = graph._graph.in_edges(v)
if len(eid) == 0:
return []
schedule_snr(graph, (u, v, eid), message_func, reduce_func, apply_func)
def _check_builtin_func_list(func_list):
"""Check whether func_list only contains builtin functions."""
for fn in func_list:
if not isinstance(fn, BuiltinFunction):
raise DGLError("If specify multiple message/reduce functions, \
all of them must be builtin")
def _standardize_func_usage(func):
"""Standardize usages of message and reduce functions
Message or reduce funtion can be:
1. a UDF
2. a dgl builtin function
3. a list of dgl builtin function
This function checks if func meets the requirement, and merges last two cases
by putting builtin function in case 2 into a list
Returns:
One single UDF function or a list of builtin function
"""
if utils.is_iterable(func):
# rfunc is a list of builtin
_check_builtin_func_list(func)
return func
elif isinstance(func, BuiltinFunction):
# func is one builtin-in
return [func]
else:
# rfunc is one UDF
return func
_init_api("dgl.runtime.scheduler")
"""Module for SPMV rules."""
from __future__ import absolute_import
from ..base import DGLError
from .. import backend as F
from .. import utils
from . import ir
from .ir import var as var
def analyze_v2v_spmv(graph, mfunc, rfunc):
"""Analyze if SPMV from node space to node space can be applied.
Parameters
----------
graph: DGLGraph
DGLGraph to use
mfunc : list of dgl.function.BuiltinFunction
The message function list.
rfunc : list of dgl.function.BuiltinFunction
The reduce function list.
Returns
-------
spmv_pairs : list of pair of builtin functions
The pair of spvm applicable message/reduce functions.
mfunc_left: list
A list of message functions that can't use v2v spmv. In other
words, these message functions need to be materialized
rfunc_left: list
A list of reduce functions that can't use v2v spmv
"""
spmv_pairs = []
mfunc_left = []
rfunc_left = []
fld2mfunc = {fn.out_field: fn for fn in mfunc}
touched_mfld = set()
for rfn in rfunc:
mfld = rfn.msg_field
if mfld not in fld2mfunc:
raise DGLError('Reduce function requires message field "%s",'
' but no message function generates it.' % mfld)
mfn = fld2mfunc[mfld]
# FIXME: should pre-compile a look up table
if mfn.is_spmv_supported(graph) and rfn.is_spmv_supported():
spmv_pairs.append((mfn, rfn))
else:
if mfld not in touched_mfld:
touched_mfld.add(mfld)
mfunc_left.append(mfn)
rfunc_left.append(rfn)
return spmv_pairs, mfunc_left, rfunc_left
def analyze_e2v_spmv(graph, rfunc):
"""Analyze if SPMV from edge space to node space can be applied.
Parameters
----------
graph: DGLGraph
DGLGraph to use
rfunc : list of dgl.function.BuiltinFunction
The reduce function list.
Returns
-------
spmv_rfunc : list
A list of spmv-applicable reduce builtins.
rfunc_left : list
A list of reduce builtins that are not applicable
"""
spmv_rfunc = []
rfunc_left = []
for rfn in rfunc:
if rfn.is_spmv_supported():
spmv_rfunc.append(rfn)
else:
rfunc_left.append(rfn)
return spmv_rfunc, rfunc_left
def gen_v2v_spmv_schedule(adjmat, spmv_pairs, nf, ef, eid, out):
"""
adjmat : sparse matrix
spmv_pairs : list of pair
nf : var.Var
input node features
ef : var.Var
input edge features
eid : var.Var
eid index
out : var.Var
output node features
"""
adj_var = var.SPMAT(adjmat)
for mfn, rfn in spmv_pairs:
#print('v2v mfn=%s rfn=%s' % (mfn.name, rfn.name))
if mfn.use_edge_feature:
ftedge = ir.READ(ef, eid, var.STR(mfn.edge_field))
ftsrc = ir.READ_COL(nf, var.STR(mfn.src_field))
ftdst = ir.SPMV_WITH_DATA(adj_var, ftedge, ftsrc)
else:
ftsrc = ir.READ_COL(nf, var.STR(mfn.src_field))
ftdst = ir.SPMV(adj_var, ftsrc)
# save for merge
ir.WRITE_COL_(out, var.STR(rfn.out_field), ftdst)
def gen_e2v_spmv_schedule(inc, spmv_rfunc, mf, out):
"""
inc : sparse matrix
The incidence matrix
spmv_rfunc : list of builtin reducers
mf : var.Var
Variable for message frame.
out : var.Var
Variable for output reduced features.
"""
inc_var = var.SPMAT(inc)
for rfn in spmv_rfunc:
ftmsg = ir.READ_COL(mf, var.STR(rfn.msg_field))
ftdst = ir.SPMV(inc_var, ftmsg)
ir.WRITE_COL_(out, var.STR(rfn.out_field), ftdst)
def build_adj_matrix(call_type, graph, u, v):
"""
call_type : str
graph : DGLGraph
u : utils.Index
v : utils.Index
"""
if call_type == "update_all":
# full graph case
return utils.CtxCachedObject(lambda ctx : graph.adjacency_matrix(ctx=ctx))
elif call_type == "send_and_recv":
# edgeset case
mat = build_adj_matrix_uv(graph, u, v)
return utils.CtxCachedObject(lambda ctx : F.copy_to(mat, ctx))
else:
raise DGLError('Invalid call type:', call_type)
def build_adj_matrix_index_uv(graph, u, v):
"""Build adj matrix index and shape using the given (u, v) edges.
The matrix is of shape (len(unique(v)), n), where n is the number of nodes
in the graph. Therefore, when doing SPMV, the src node data
should be all the node features.
The dst nodes will be sorted in the *unique-ascending* order of
their ids. This is compatible with other reduce scheduler such as
degree-bucketing scheduler.
Paramters
---------
graph : DGLGraph
The graph
u : utils.Index
Src nodes.
v : utils.Index
Dst nodes.
Returns
-------
sparse index
The sparse index.
tupe of int
The dense shape.
"""
new2old, old2new = utils.build_relabel_map(v)
u = u.tousertensor()
v = v.tousertensor()
new_v = old2new[v] # FIXME(minjie): no use []
n = graph.number_of_nodes()
m = len(new2old)
row = F.unsqueeze(new_v, 0)
col = F.unsqueeze(u, 0)
idx = F.cat([row, col], dim=0)
return ('coo', idx), (m, n)
def build_adj_matrix_uv(graph, u, v):
"""Build adj matrix using the given (u, v) edges.
The matrix is of shape (len(v), n), where n is the number of nodes
in the graph. Therefore, when doing SPMV, the src node data
should be all the node features.
Paramters
---------
graph : DGLGraph
The graph
u : utils.Index
Src nodes.
v : utils.Index
Dst nodes.
Returns
-------
Sparse matrix
The adjacency matrix on CPU
"""
sp_idx, shape = build_adj_matrix_index_uv(graph, u, v)
nnz = len(u)
# FIXME(minjie): data type
dat = F.ones((nnz,), dtype=F.float32, ctx=F.cpu())
mat = F.sparse_matrix(dat, sp_idx, shape)
return mat
def build_inc_matrix(call_type, graph, eid, v):
"""
call_type : str
graph : DGLGraph
eid : utils.Index
v : utils.Index
"""
if call_type == "update_all":
# full graph case
return utils.CtxCachedObject(lambda ctx : graph.incidence_matrix(type='in', ctx=ctx))
elif call_type == "send_and_recv":
# edgeset case
mat = build_inc_matrix_v(v)
return utils.CtxCachedObject(lambda ctx : F.copy_to(mat, ctx))
elif call_type == "recv":
# dst nodeset case
mat = build_inc_matrix_eid(eid, v)
return utils.CtxCachedObject(lambda ctx : F.copy_to(mat, ctx))
else:
raise DGLError('Invalid call type:', call_type)
def build_inc_matrix_eid(eid, v):
"""A spmat of shape (n, m), where n=len(unique(v)), m=len(eid).
Invariant: len(eid) == len(v)
The dst nodes will be sorted in the *unique-ascending* order of
their ids. This is compatible with other reduce scheduler such as
degree-bucketing scheduler.
eid : utils.Index
v : utils.Index
"""
# relabel v to range(0, len(unique(v)))
new2old, old2new = utils.build_relabel_map(v)
v = v.tousertensor()
eid = eid.tousertensor()
new_v = old2new[v] # FIXME(minjie): no use []
# create sparse index tensor
m = len(eid)
n = len(new2old)
row = F.unsqueeze(new_v, 0)
col = F.unsqueeze(eid, 0)
idx = F.cat([row, col], dim=0)
# create dat tensor
nnz = len(eid)
dat = F.ones((nnz,), dtype=F.float32, ctx=F.cpu())
return F.sparse_matrix(dat, ('coo', idx), (n, m))
def build_inc_matrix_v(v):
"""A spmat of shape (n, m), where n=len(unique(v)), m=len(v).
v : utils.Index
"""
eid = utils.toindex(F.arange(0, len(v)))
return build_inc_matrix_eid(eid, v)
"""Schedule policies for graph computation."""
from __future__ import absolute_import
import numpy as np
from .base import ALL, DGLError
from . import backend as F
from collections import defaultdict as ddict
from .function import message as fmsg
from .function import reducer as fred
from .udf import NodeBatch, EdgeBatch
from . import utils
from ._ffi.function import _init_api
__all__ = ["degree_bucketing", "get_recv_executor", "get_executor"]
def degree_bucketing(graph, v):
"""Create degree bucketing scheduling policy.
Parameters
----------
graph : dgl.graph_index.GraphIndex
the graph
v : dgl.utils.Index
the nodes to gather messages
Returns
-------
unique_degrees : list of int
list of unique degrees
v_bkt : list of dgl.utils.Index
list of node id buckets; nodes belong to the same bucket have
the same degree
"""
degrees = np.array(graph.in_degrees(v).tolist())
unique_degrees = list(np.unique(degrees))
v_np = np.array(v.tolist())
v_bkt = []
for deg in unique_degrees:
idx = np.where(degrees == deg)
v_bkt.append(utils.Index(v_np[idx]))
#print('degree-bucketing:', unique_degrees, [len(b) for b in v_bkt])
return unique_degrees, v_bkt
def _process_buckets(buckets):
"""read bucketing auxiliary data"""
# get back results
degs = utils.toindex(buckets(0))
v = utils.toindex(buckets(1))
# TODO: convert directly from ndarary to python list?
v_section = buckets(2).asnumpy().tolist()
msg_ids = utils.toindex(buckets(3))
msg_section = buckets(4).asnumpy().tolist()
# split buckets
unique_v = v.tousertensor()
msg_ids = msg_ids.tousertensor()
dsts = F.split(unique_v, v_section, dim=0)
msg_ids = F.split(msg_ids, msg_section, dim=0)
# convert to utils.Index
unique_v = utils.toindex(unique_v)
dsts = [utils.toindex(dst) for dst in dsts]
msg_ids = [utils.toindex(msg_id) for msg_id in msg_ids]
return unique_v, degs, dsts, msg_ids
def light_degree_bucketing(v):
"""Return the bucketing by degree scheduling for destination nodes of messages
Parameters
----------
v: utils.Index
destionation node for each message
Returns
-------
unique_v: utils.Index
unqiue destination nodes
degrees: utils.Index
A list of degree for each bucket
v_bkt: list of utils.Index
A list of node id buckets, nodes in each bucket have the same degree
msg_ids: list of utils.Index
A list of message id buckets, each node in the ith node id bucket has
degree[i] messages in the ith message id bucket
"""
buckets = _CAPI_DGLDegreeBucketing(v.todgltensor())
return _process_buckets(buckets)
def light_degree_bucketing_for_graph(graph):
"""Return the bucketing by degree scheduling for the entire graph
Parameters:
graph: GraphIndex
Returns
-------
unique_v: utils.Index
unqiue destination nodes
degrees: utils.Index
A list of degree for each bucket
v_bkt: list of utils.Index
A list of node id buckets, nodes in each bucket have the same degree
msg_ids: list of utils.Index
A list of message id buckets, each node in the ith node id bucket has
degree[i] messages in the ith message id bucket
"""
buckets = _CAPI_DGLDegreeBucketingFromGraph(self._handle)
return _process_buckets(buckets)
class Executor(object):
"""Base class for executing graph computation."""
def run(self):
"""Run this executor.
This should return the new node features.
TODO(minjie): extend this to support computation on edges.
"""
raise NotImplementedError
class SPMVOperator(Executor):
def __init__(self, src_field, edge_field, dst_field, use_edge_feat,
node_repr, adj_build_fn):
self.src_field = src_field
self.edge_field = edge_field
self.dst_field = dst_field
self.use_edge_feat = use_edge_feat
self.node_repr = node_repr
self.adj_build_fn = adj_build_fn
def run(self):
# get src col
srccol = self.node_repr[self.src_field]
ctx = F.context(srccol)
# build adjmat
adjmat = self.adj_build_fn(self.edge_field, ctx, self.use_edge_feat)
# spmm
if len(F.shape(srccol)) == 1:
srccol = F.unsqueeze(srccol, 1)
dstcol = F.spmm(adjmat, srccol)
dstcol = F.squeeze(dstcol, 1)
else:
dstcol = F.spmm(adjmat, srccol)
return {self.dst_field : dstcol}
# FIXME: refactorize in scheduler/executor redesign
class DegreeBucketingExecutor(Executor):
def __init__(self, g, rfunc, message_frame, edges=None):
self.g = g
self.rfunc = rfunc
self.msg_frame = message_frame
# calc degree bucketing schedule
if edges is not None:
unique_v, degs, dsts, msg_ids = light_degree_bucketing(edges[1])
else:
unique_v, degs, dsts, msg_ids = light_degree_bucketing_for_graph(g._graph)
self._recv_nodes = unique_v
self.degrees = degs
self.dsts = dsts
self.msg_ids = msg_ids
@property
def recv_nodes(self):
return self._recv_nodes
def run(self):
new_reprs = []
# loop over each bucket
# FIXME (lingfan): handle zero-degree case
for deg, vv, msg_id in zip(self.degrees, self.dsts, self.msg_ids):
v_data = self.g.get_n_repr(vv)
in_msgs = self.msg_frame.select_rows(msg_id)
def _reshape_fn(msg):
msg_shape = F.shape(msg)
new_shape = (len(vv), deg) + msg_shape[1:]
return F.reshape(msg, new_shape)
reshaped_in_msgs = utils.LazyDict(
lambda key: _reshape_fn(in_msgs[key]), self.msg_frame.schemes)
nb = NodeBatch(self.g, vv, v_data, reshaped_in_msgs)
new_reprs.append(self.rfunc(nb))
# Pack all reducer results together
keys = new_reprs[0].keys()
new_reprs = {key : F.cat([repr[key] for repr in new_reprs], dim=0)
for key in keys}
return new_reprs
class BasicExecutor(Executor):
def __init__(self, graph, mfunc, rfunc):
self.g = graph
self.exe = self._build_exec(mfunc, rfunc)
@property
def node_repr(self):
raise NotImplementedError
@property
def edge_repr(self):
raise NotImplementedError
@property
def recv_nodes(self):
raise NotImplementedError
def _build_exec(self, mfunc, rfunc):
if isinstance(mfunc, fmsg.CopySrcMessageFunction):
exe = SPMVOperator(src_field=mfunc.src_field,
edge_field=None,
dst_field=rfunc.out_field,
use_edge_feat=False,
node_repr=self.node_repr,
adj_build_fn=self._adj_build_fn)
elif isinstance(mfunc, fmsg.SrcMulEdgeMessageFunction):
exe = SPMVOperator(src_field=mfunc.src_field,
edge_field=mfunc.edge_field,
dst_field=rfunc.out_field,
use_edge_feat=True,
node_repr=self.node_repr,
adj_build_fn=self._adj_build_fn)
else:
raise NotImplementedError("message func type {}".format(type(mfunc)))
return exe
def run(self):
return self.exe.run()
class UpdateAllExecutor(BasicExecutor):
def __init__(self, graph, mfunc, rfunc):
self._init_state()
super(UpdateAllExecutor, self).__init__(graph, mfunc, rfunc)
def _init_state(self):
self._node_repr = None
self._edge_repr = None
self._graph_idx = None
self._graph_shape = None
self._recv_nodes = None
@property
def graph_shape(self):
if self._graph_shape is None:
n = self.g.number_of_nodes()
self._graph_shape = [n, n]
return self._graph_shape
@property
def recv_nodes(self):
return ALL
@property
def node_repr(self):
if self._node_repr is None:
self._node_repr = self.g.get_n_repr()
return self._node_repr
@property
def edge_repr(self):
if self._edge_repr is None:
self._edge_repr = self.g.get_e_repr()
return self._edge_repr
def _adj_build_fn(self, edge_field, ctx, use_edge_feat):
if use_edge_feat:
dat = self.edge_repr[edge_field]
if len(F.shape(dat)) > 1:
# The edge feature is of shape (N, 1)
dat = F.squeeze(dat, 1)
idx = F.sparse_matrix_indices(self.g.adjacency_matrix(ctx=ctx))
adjmat = F.sparse_matrix(dat, idx, self.graph_shape)
else:
adjmat = self.g.adjacency_matrix(ctx=ctx)
return adjmat
class SendRecvExecutor(BasicExecutor):
def __init__(self, graph, src, dst, mfunc, rfunc):
self._init_state(src, dst)
super(SendRecvExecutor, self).__init__(graph, mfunc, rfunc)
def _init_state(self, src, dst):
self.u, self.v = utils.edge_broadcasting(src, dst)
self._node_repr = None
self._edge_repr = None
self._graph_idx = None
self._graph_shape = None
self._recv_nodes = None
@property
def graph_idx(self):
if self._graph_idx is None:
self._build_adjmat()
return self._graph_idx
@property
def graph_shape(self):
if self._graph_shape is None:
self._build_adjmat()
return self._graph_shape
@property
def recv_nodes(self):
if self._recv_nodes is None:
self._build_adjmat()
return self._recv_nodes
@property
def node_repr(self):
if self._node_repr is None:
self._node_repr = self.g.get_n_repr()
return self._node_repr
@property
def edge_repr(self):
if self._edge_repr is None:
self._edge_repr = self.g.get_e_repr((self.u, self.v))
return self._edge_repr
def _build_adjmat(self):
# handle graph index
new2old, old2new = utils.build_relabel_map(self.v)
u = self.u.tousertensor()
v = self.v.tousertensor()
# TODO(minjie): should not directly use []
new_v = old2new[v]
n = self.g.number_of_nodes()
m = len(new2old)
self._graph_idx = F.cat(
[F.unsqueeze(new_v, 0), F.unsqueeze(u, 0)], dim=0)
self._graph_shape = [m, n]
self._recv_nodes = new2old
def _adj_build_fn(self, edge_field, ctx, use_edge_feat):
if use_edge_feat:
dat = self.edge_repr[edge_field]
if len(F.shape(dat)) > 1:
# edge feature is of shape (N, 1)
dat = F.squeeze(dat, dim=1)
else:
dat = F.ones((len(self.u), ), dtype=F.float32, ctx=ctx)
adjmat = F.sparse_matrix(
dat, ('coo', F.copy_to(self.graph_idx, ctx)), self.graph_shape)
return adjmat
class BundledExecutor(BasicExecutor):
"""
Base class for Bundled execution
All shared structure like graph index should be cached in this class or its subclass
BundledUpdateAllExecutor and BundledSendRecvExecutor should subclass BundledExecutor
"""
def __init__(self, graph, mfunc, rfunc):
self.g = graph
func_pairs = self._match_message_with_reduce(mfunc, rfunc)
# create all executors
self.executors = self._build_executors(func_pairs)
def _build_executors(self, func_pairs):
executors = []
for mfunc, rfunc in func_pairs:
exe = self._build_exec(mfunc, rfunc)
executors.append(exe)
return executors
def _match_message_with_reduce(self, mfunc, rfunc):
out2mfunc = {fn.out_field: fn for fn in mfunc.fn_list}
func_pairs = []
for rfn in rfunc.fn_list:
mfn = out2mfunc.get(rfn.msg_field, None)
if mfn is None:
raise DGLError('Cannot find message field "%s".' % rfn.msg_field)
func_pairs.append((mfn, rfn))
return func_pairs
def run(self):
attr = None
for exe in self.executors:
res = exe.run()
if attr is None:
attr = res
else:
# attr and res must be dict
attr.update(res)
return attr
class BundledUpdateAllExecutor(BundledExecutor, UpdateAllExecutor):
def __init__(self, graph, mfunc, rfunc):
self._init_state()
BundledExecutor.__init__(self, graph, mfunc, rfunc)
class BundledSendRecvExecutor(BundledExecutor, SendRecvExecutor):
def __init__(self, graph, src, dst, mfunc, rfunc):
self._init_state(src, dst)
BundledExecutor.__init__(self, graph, mfunc, rfunc)
def _is_spmv_supported(fn, graph=None):
# FIXME: also take into account
# (1) which backend DGL is under.
# (2) whether the graph is a multigraph.
#
# Current SPMV optimizer assumes that duplicate entries are summed up
# in sparse matrices, which is the case for PyTorch but not MXNet.
# The result is that on multigraphs, SPMV can still work for reducer=sum
# and message=copy_src/src_mul_edge *only in PyTorch*.
if isinstance(fn, fmsg.MessageFunction):
return fn.is_spmv_supported(graph)
elif isinstance(fn, fred.ReduceFunction):
return fn.is_spmv_supported()
else:
return False
def _create_update_all_exec(graph, **kwargs):
mfunc = kwargs.pop('message_func')
rfunc = kwargs.pop('reduce_func')
if isinstance(mfunc, (list, tuple)) or isinstance(rfunc, (list, tuple)):
mfunc = fmsg.BundledMessageFunction(mfunc)
rfunc = fred.BundledReduceFunction(rfunc)
exec_cls = BundledUpdateAllExecutor
else:
exec_cls = UpdateAllExecutor
if _is_spmv_supported(mfunc, graph) and _is_spmv_supported(rfunc):
return exec_cls(graph, mfunc=mfunc, rfunc=rfunc)
else:
return None
def _create_send_and_recv_exec(graph, **kwargs):
src = kwargs.pop('src')
dst = kwargs.pop('dst')
mfunc = kwargs.pop('message_func')
rfunc = kwargs.pop('reduce_func')
if (isinstance(mfunc, fmsg.BundledMessageFunction)
or isinstance(rfunc, fred.BundledReduceFunction)):
if not isinstance(mfunc, fmsg.BundledMessageFunction):
mfunc = fmsg.BundledMessageFunction(mfunc)
if not isinstance(rfunc, fred.BundledReduceFunction):
rfunc = fred.BundledReduceFunction(rfunc)
exec_cls = BundledSendRecvExecutor
else:
exec_cls = SendRecvExecutor
if _is_spmv_supported(mfunc, graph) and _is_spmv_supported(rfunc):
return exec_cls(graph, src=src, dst=dst, mfunc=mfunc, rfunc=rfunc)
else:
return None
def get_executor(call_type, graph, **kwargs):
if call_type == "update_all":
return _create_update_all_exec(graph, **kwargs)
elif call_type == "send_and_recv":
return _create_send_and_recv_exec(graph, **kwargs)
else:
return None
def get_recv_executor(graph, reduce_func, message_frame, edges=None):
"""Create executor for recv phase
Parameters
----------
graph: DGLGraph
DGLGraph on which to perform recv
reduce_func: callable
The reduce function
message_frame: FrameRef
Message frame
edges: tuple/list of utils.Index
src and dst Index representing edges along which messages are sent
If not specified, all edges of graph are used instead
"""
# FIXME: handle builtin spmv executor case
return DegreeBucketingExecutor(graph, reduce_func, message_frame, edges)
_init_api("dgl.scheduler")
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment