Commit 653428bd authored by Lingfan Yu's avatar Lingfan Yu Committed by Minjie Wang
Browse files

[Feature][Kernel] DGL kernel support (#596)

* [Kernel] Minigun integration and fused kernel support (#519)

* kernel interface

* add minigun

* Add cuda build

* functors

* working on binary elewise

* binary reduce

* change kernel interface

* WIP

* wip

* fix minigun

* compile

* binary reduce kernels

* compile

* simple test passed

* more reducers

* fix thrust problem

* fix cmake

* fix cmake; add proper guard for atomic

* WIP: bcast

* WIP

* bcast kernels

* update to new minigun pass-by-value practice

* broadcasting dim

* add copy src and copy edge

* fix linking

* fix none array problem

* fix copy edge

* add device_type and device_id to backend operator

* cache csr adj, remove cache for adjmat and incmat

* custom ops in backend and pytorch impl

* change dgl-mg kernel python interface

* add id_mapping var

* clean up plus v2e spmv schedule

* spmv schedule & clean up fall back

* symbolic message and reduce func, remove bundle func

* new executors

* new backend interface for dgl kernels and pytorch impl

* minor fix

* fix

* fix docstring, comments, func names

* nodeflow

* fix message id mapping and bugs...

* pytorch test case & fix

* backward binary reduce

* fix bug

* WIP: cusparse

* change to int32 csr for cusparse workaround

* disable cusparse

* change back to int64

* broadcasting backward

* cusparse; WIP: add rev_csr

* unit test for kernels

* pytorch backward with dgl kernel

* edge softmax

* fix backward

* improve softmax

* cache edge on device

* cache mappings on device

* fix partial forward code

* cusparse done

* copy_src_sum with cusparse

* rm id getter

* reduce grad for broadcast

* copy edge reduce backward

* kernel unit test for broadcasting

* full kernel unit test

* add cpu kernels

* edge softmax unit test

* missing ref

* fix compile and small bugs

* fix bug in bcast

* Add backward both

* fix torch utests

* expose infershape

* create out tensor in python

* fix c++ lint

* [Kernel] Add GPU utest and kernel utest (#524)

* fix gpu utest

* cuda utest runnable

* temp disable test nodeflow; unified test for kernel

* cuda test kernel done

* [Kernel] Update kernel branch (#550)

* [Model] add multiprocessing training with sampling. (#484)

* reorganize sampling code.

* add multi-process training.

* speed up gcn_cv

* fix graphsage_cv.

* add new API in graph store.

* update barrier impl.

* support both local and distributed training.

* fix multiprocess train.

* fix.

* fix barrier.

* add script for loading data.

* multiprocessing sampling.

* accel training.

* replace pull with spmv for speedup.

* nodeflow copy from parent with context.

* enable GPU.

* fix a bug in graph store.

* enable multi-GPU training.

* fix lint.

* add comments.

* rename to run_store_server.py

* fix gcn_cv.

* fix a minor bug in sampler.

* handle error better in graph store.

* improve graphsage_cv for distributed mode.

* update README.

* fix.

* update.

* [Tutorial] add sampling tutorial. (#522)

* add sampling tutorial.

* add readme

* update author list.

* fix indent in the code.

* rename the file.

* update tutorial.

* fix the last API.

* update image.

* [BUGFIX] fix the problems in the sampling tutorial. (#523)

* add index.

* update.

* update tutorial.

* fix gpu utest

* cuda utest runnable

* temp disable test nodeflow; unified test for kernel

* cuda test kernel done

* Fixing typo in JTNN after interface change (#536)

* [BugFix] Fix getting src and dst id of ALL edges in NodeFlow.apply_block (#515)

* [Bug Fix] Fix inplace op at backend (#546)

* Fix inplace operation

* fix line seprator

* [Feature] Add batch and unbatch for immutable graph (#539)

* Add batch and unbatch for immutable graph

* fix line seprator

* fix lintr

* remove unnecessary include

* fix code review

* [BUGFix] Improve multi-processing training (#526)

* fix.

* add comment.

* remove.

* temp fix.

* initialize for shared memory.

* fix graphsage.

* fix gcn.

* add more unit tests.

* add more tests.

* avoid creating shared-memory exclusively.

* redefine remote initializer.

* improve initializer.

* fix unit test.

* fix lint.

* fix lint.

* initialize data in the graph store server properly.

* fix test.

* fix test.

* fix test.

* small fix.

* add comments.

* cleanup server.

* test graph store with a random port.

* print.

* print to stderr.

* test1

* test2

* remove comment.

* adjust the initializer signature.

* [API] update graph store API. (#549)

* add init_ndata and init_edata in DGLGraph.

* adjust SharedMemoryGraph API.

* print warning.

* fix comment.

* update example

* fix.

* fix examples.

* add unit tests.

* add comments.

* [Refactor] Immutable graph index (#543)

* WIP

* header

* WIP .cc

* WIP

* transpose

* wip

* immutable graph .h and .cc

* WIP: nodeflow.cc

* compile

* remove all tmp dl managed ctx; they caused refcount issue

* one simple test

* WIP: testing

* test_graph

* fix graph index

* fix bug in sampler; pass pytorch utest

* WIP on mxnet

* fix lint

* fix mxnet unittest w/ unfortunate workaround

* fix msvc

* fix lint

* SliceRows and test_nodeflow

* resolve reviews

* resolve reviews

* try fix win ci

* try fix win ci

* poke win ci again

* poke

* lazy multigraph flag; stackoverflow error

* revert node subgraph test

* lazy object

* try fix win build

* try fix win build

* poke ci

* fix build script

* fix compile

* add a todo

* fix reviews

* fix compile

* [Kernel] Update kernel branch (#576)

* [Model] add multiprocessing training with sampling. (#484)

* reorganize sampling code.

* add multi-process training.

* speed up gcn_cv

* fix graphsage_cv.

* add new API in graph store.

* update barrier impl.

* support both local and distributed training.

* fix multiprocess train.

* fix.

* fix barrier.

* add script for loading data.

* multiprocessing sampling.

* accel training.

* replace pull with spmv for speedup.

* nodeflow copy from parent with context.

* enable GPU.

* fix a bug in graph store.

* enable multi-GPU training.

* fix lint.

* add comments.

* rename to run_store_server.py

* fix gcn_cv.

* fix a minor bug in sampler.

* handle error better in graph store.

* improve graphsage_cv for distributed mode.

* update README.

* fix.

* update.

* [Tutorial] add sampling tutorial. (#522)

* add sampling tutorial.

* add readme

* update author list.

* fix indent in the code.

* rename the file.

* update tutorial.

* fix the last API.

* update image.

* [BUGFIX] fix the problems in the sampling tutorial. (#523)

* add index.

* update.

* update tutorial.

* fix gpu utest

* cuda utest runnable

* temp disable test nodeflow; unified test for kernel

* cuda test kernel done

* Fixing typo in JTNN after interface change (#536)

* [BugFix] Fix getting src and dst id of ALL edges in NodeFlow.apply_block (#515)

* [Bug Fix] Fix inplace op at backend (#546)

* Fix inplace operation

* fix line seprator

* [Feature] Add batch and unbatch for immutable graph (#539)

* Add batch and unbatch for immutable graph

* fix line seprator

* fix lintr

* remove unnecessary include

* fix code review

* [BUGFix] Improve multi-processing training (#526)

* fix.

* add comment.

* remove.

* temp fix.

* initialize for shared memory.

* fix graphsage.

* fix gcn.

* add more unit tests.

* add more tests.

* avoid creating shared-memory exclusively.

* redefine remote initializer.

* improve initializer.

* fix unit test.

* fix lint.

* fix lint.

* initialize data in the graph store server properly.

* fix test.

* fix test.

* fix test.

* small fix.

* add comments.

* cleanup server.

* test graph store with a random port.

* print.

* print to stderr.

* test1

* test2

* remove comment.

* adjust the initializer signature.

* [API] update graph store API. (#549)

* add init_ndata and init_edata in DGLGraph.

* adjust SharedMemoryGraph API.

* print warning.

* fix comment.

* update example

* fix.

* fix examples.

* add unit tests.

* add comments.

* [Refactor] Immutable graph index (#543)

* WIP

* header

* WIP .cc

* WIP

* transpose

* wip

* immutable graph .h and .cc

* WIP: nodeflow.cc

* compile

* remove all tmp dl managed ctx; they caused refcount issue

* one simple test

* WIP: testing

* test_graph

* fix graph index

* fix bug in sampler; pass pytorch utest

* WIP on mxnet

* fix lint

* fix mxnet unittest w/ unfortunate workaround

* fix msvc

* fix lint

* SliceRows and test_nodeflow

* resolve reviews

* resolve reviews

* try fix win ci

* try fix win ci

* poke win ci again

* poke

* lazy multigraph flag; stackoverflow error

* revert node subgraph test

* lazy object

* try fix win build

* try fix win build

* poke ci

* fix build script

* fix compile

* add a todo

* fix reviews

* fix compile

* all demo use python-3 (#555)

* [DEMO] Reproduce numbers of distributed training in AMLC giant graph paper (#556)

* update

* update

* update

* update num_hops

* fix bug

* update

* report numbers of distributed training in AMLC giant graph paper

* [DEMO] Remove duplicate code for sampling (#557)

* update

* update

* re-use single-machine code

* update

* use relative path

* update

* update

* update

* add __init__.py

* add __init__.py

* import sys, os

* fix typo

* update

* [Perf] Improve performance of graph store. (#554)

* fix.

* use inplace.

* move to shared memory graph store.

* fix.

* add more unit tests.

* fix.

* fix test.

* fix test.

* disable test.

* fix.

* [BUGIFX] fix a bug in edge_ids (#560)

* add test.

* fix compute.

* fix test.

* turn on test.

* fix a bug.

* add test.

* fix.

* disable test.

* [DEMO] Add Pytorch demo for distributed sampler (#562)

* update

* update

* update

* add sender

* update

* remove duplicate cpde

* [Test] Add gtest to project (#547)

* add gtest module

* add gtest

* fix

* Update CMakeLists.txt

* Update README.md

* [Perf] lazily create msg_index. (#563)

* lazily create msg_index.

* update test.

* [BUGFIX] fix bugs for running GCN on giant graphs. (#561)

* load mxnet csr.

* enable load large csr.

* fix

* fix.

* fix int overflow.

* fix test.

* [BugFix] Fix error when bfs_level = 0 in Entity Classification with RGCN (#559)

* [DEMO] Update demo of distributed sampler (#564)

* update

* update

* update demo

* add network cpp test (#565)

* Add unittest for C++ RPC (#566)

* [CI] Fix CI for cpp test (#570)

* fix CI for cpp test

* update port number

* [Docker] update docker image (#575)

* update docker image

* specify lint version

* rm torch import from unified tests

* [Kernel][Scheduler][MXNet] Scheduler for DGL kernels and MXNet backend support (#541)

* [Model] add multiprocessing training with sampling. (#484)

* reorganize sampling code.

* add multi-process training.

* speed up gcn_cv

* fix graphsage_cv.

* add new API in graph store.

* update barrier impl.

* support both local and distributed training.

* fix multiprocess train.

* fix.

* fix barrier.

* add script for loading data.

* multiprocessing sampling.

* accel training.

* replace pull with spmv for speedup.

* nodeflow copy from parent with context.

* enable GPU.

* fix a bug in graph store.

* enable multi-GPU training.

* fix lint.

* add comments.

* rename to run_store_server.py

* fix gcn_cv.

* fix a minor bug in sampler.

* handle error better in graph store.

* improve graphsage_cv for distributed mode.

* update README.

* fix.

* update.

* [Tutorial] add sampling tutorial. (#522)

* add sampling tutorial.

* add readme

* update author list.

* fix indent in the code.

* rename the file.

* update tutorial.

* fix the last API.

* update image.

* [BUGFIX] fix the problems in the sampling tutorial. (#523)

* add index.

* update.

* update tutorial.

* fix gpu utest

* cuda utest runnable

* temp disable test nodeflow; unified test for kernel

* cuda test kernel done

* edge softmax module

* WIP

* Fixing typo in JTNN after interface change (#536)

* mxnet backend support

* improve reduce grad

* add max to unittest backend

* fix kernel unittest

* [BugFix] Fix getting src and dst id of ALL edges in NodeFlow.apply_block (#515)

* lint

* lint

* win build

* [Bug Fix] Fix inplace op at backend (#546)

* Fix inplace operation

* fix line seprator

* [Feature] Add batch and unbatch for immutable graph (#539)

* Add batch and unbatch for immutable graph

* fix line seprator

* fix lintr

* remove unnecessary include

* fix code review

* [BUGFix] Improve multi-processing training (#526)

* fix.

* add comment.

* remove.

* temp fix.

* initialize for shared memory.

* fix graphsage.

* fix gcn.

* add more unit tests.

* add more tests.

* avoid creating shared-memory exclusively.

* redefine remote initializer.

* improve initializer.

* fix unit test.

* fix lint.

* fix lint.

* initialize data in the graph store server properly.

* fix test.

* fix test.

* fix test.

* small fix.

* add comments.

* cleanup server.

* test graph store with a random port.

* print.

* print to stderr.

* test1

* test2

* remove comment.

* adjust the initializer signature.

* try

* fix

* fix

* fix

* fix

* fix

* try

* test

* test

* test

* try

* try

* try

* test

* fix

* try gen_target

* fix gen_target

* fix msvc var_args expand issue

* fix

* [API] update graph store API. (#549)

* add init_ndata and init_edata in DGLGraph.

* adjust SharedMemoryGraph API.

* print warning.

* fix comment.

* update example

* fix.

* fix examples.

* add unit tests.

* add comments.

* [Refactor] Immutable graph index (#543)

* WIP

* header

* WIP .cc

* WIP

* transpose

* wip

* immutable graph .h and .cc

* WIP: nodeflow.cc

* compile

* remove all tmp dl managed ctx; they caused refcount issue

* one simple test

* WIP: testing

* test_graph

* fix graph index

* fix bug in sampler; pass pytorch utest

* WIP on mxnet

* fix lint

* fix mxnet unittest w/ unfortunate workaround

* fix msvc

* fix lint

* SliceRows and test_nodeflow

* resolve reviews

* resolve reviews

* try fix win ci

* try fix win ci

* poke win ci again

* poke

* lazy multigraph flag; stackoverflow error

* revert node subgraph test

* lazy object

* try fix win build

* try fix win build

* poke ci

* fix build script

* fix compile

* add a todo

* fix reviews

* fix compile

* WIP

* WIP

* all demo use python-3 (#555)

* ToImmutable and CopyTo

* [DEMO] Reproduce numbers of distributed training in AMLC giant graph paper (#556)

* update

* update

* update

* update num_hops

* fix bug

* update

* report numbers of distributed training in AMLC giant graph paper

* [DEMO] Remove duplicate code for sampling (#557)

* update

* update

* re-use single-machine code

* update

* use relative path

* update

* update

* update

* add __init__.py

* add __init__.py

* import sys, os

* fix typo

* update

* [Perf] Improve performance of graph store. (#554)

* fix.

* use inplace.

* move to shared memory graph store.

* fix.

* add more unit tests.

* fix.

* fix test.

* fix test.

* disable test.

* fix.

* [BUGIFX] fix a bug in edge_ids (#560)

* add test.

* fix compute.

* fix test.

* turn on test.

* fix a bug.

* add test.

* fix.

* disable test.

* DGLRetValue DGLContext conversion

* [DEMO] Add Pytorch demo for distributed sampler (#562)

* update

* update

* update

* add sender

* update

* remove duplicate cpde

* [Test] Add gtest to project (#547)

* add gtest module

* add gtest

* fix

* Update CMakeLists.txt

* Update README.md

* Add support to convert immutable graph to 32 bits

* [Perf] lazily create msg_index. (#563)

* lazily create msg_index.

* update test.

* fix binary reduce following new minigun template

* enable both int64 and int32 kernels

* [BUGFIX] fix bugs for running GCN on giant graphs. (#561)

* load mxnet csr.

* enable load large csr.

* fix

* fix.

* fix int overflow.

* fix test.

* new kernel interface done for CPU

* docstring

* rename & docstring

* copy reduce and backward

* [BugFix] Fix error when bfs_level = 0 in Entity Classification with RGCN (#559)

* [DEMO] Update demo of distributed sampler (#564)

* update

* update

* update demo

* adapt cuda kernels to the new interface

* add network cpp test (#565)

* fix bug

* Add unittest for C++ RPC (#566)

* [CI] Fix CI for cpp test (#570)

* fix CI for cpp test

* update port number

* [Docker] update docker image (#575)

* update docker image

* specify lint version

* rm torch import from unified tests

* remove pytorch-specific test_function

* fix unittest

* fix

* fix unittest backend bug in converting tensor to numpy array

* fix

* mxnet version

* [BUGFIX] fix for MXNet 1.5. (#552)

* remove clone.

* turn on numpy compatible.

* Revert "remove clone."

This reverts commit 17bbf76ed72ff178df6b3f35addc428048672457.

* revert format changes

* fix mxnet api name

* revert mistakes in previous revert

* roll back CI to 20190523 build

* fix unittest

* disable test_shared_mem_store.py for now

* remove mxnet/test_specialization.py

* sync win64 test script

* fix lowercase

* missing backend in gpu unit test

* transpose to get forward graph

* pass update all

* add sanity check

* passing test_specialization.py

* fix and pass test_function

* fix check

* fix pytorch softmax

* mxnet kernels

* c++ lint

* pylint

* try

* win build

* fix

* win

* ci enable gpu build

* init submodule recursively

* backend docstring

* try

* test win dev

* doc string

* disable pytorch test_nn

* try to fix windows issue

* bug fixed, revert changes

* [Test] fix CI. (#586)

* disable unit test in mxnet tutorial.

* retry socket connection.

* roll back to set_np_compat

* try to fix multi-processing test hangs when it fails.

* fix test.

* fix.

* doc string

* doc string and clean up

* missing field in ctypes

* fix node flow schedule and unit test

* rename

* pylint

* copy from parent default context

* fix unit test script

* fix

* demo bug in nodeflow gpu test

* [Kernel][Bugfix] fix nodeflow bug (#604)

* fix nodeflow bug

* remove debug code

* add build gtest option

* fix cmake; fix graph index bug in spmv.py

* remove clone

* fix div rhs grad bug

* [Kernel] Support full builtin method, edge softmax and unit tests (#605)

* add full builtin support

* unit test

* unit test backend

* edge softmax

* apply edge with builtin

* fix kernel unit test

* disable mxnet test_shared_mem_store

* gen builtin reduce

* enable mxnet gpu unittest

* revert some changes

* docstring

* add note for the hack

* [Kernel][Unittest][CI] Fix MXNet GPU CI (#607)

* update docker image for MXNet GPU CI

* force all dgl graph input and output on CPU

* fix gpu unittest

* speedup compilation

* add some comments

* lint

* add more comments

* fix as requested

* add some comments

* comment

* lint

* lint

* update pylint

* fix as requested

* lint

* lint

* lint

* docstrings of python DGL kernel entries

* disable lint warnings on arguments in kernel.py

* fix docstring in scheduler

* fix some bug in unittest; try again

* Revert "Merge branch 'kernel' of github.com:zzhang-cn/dgl into kernel"

This reverts commit 1d2299e68b004182ea6130b088de1f1122b18a49, reversing
changes made to ddc97fbf1bec2b7815c0da7c74f7ecb2f428889b.

* Revert "fix some bug in unittest; try again"

This reverts commit ddc97fbf1bec2b7815c0da7c74f7ecb2f428889b.

* more comprehensive kernel test

* remove shape check in test_specialization
parent da0c92a2
...@@ -6,6 +6,8 @@ import numpy as np ...@@ -6,6 +6,8 @@ import numpy as np
import mxnet as mx import mxnet as mx
import mxnet.ndarray as nd import mxnet.ndarray as nd
import numbers import numbers
from ... import ndarray as dglnd
from ... import kernel as K
MX_VERSION = LooseVersion(mx.__version__) MX_VERSION = LooseVersion(mx.__version__)
# After MXNet 1.5, empty tensors aren't supprted by default. # After MXNet 1.5, empty tensors aren't supprted by default.
...@@ -92,6 +94,12 @@ def ndim(input): ...@@ -92,6 +94,12 @@ def ndim(input):
def context(input): def context(input):
return input.context return input.context
def device_type(ctx):
return ctx.device_type
def device_id(ctx):
return ctx.device_id
def astype(input, ty): def astype(input, ty):
return nd.cast(input, ty) return nd.cast(input, ty)
...@@ -164,9 +172,6 @@ def zeros_like(input): ...@@ -164,9 +172,6 @@ def zeros_like(input):
def ones(shape, dtype, ctx): def ones(shape, dtype, ctx):
return nd.ones(shape, dtype=dtype, ctx=ctx) return nd.ones(shape, dtype=dtype, ctx=ctx)
def spmm(x, y):
return nd.dot(x, y)
def unsorted_1d_segment_sum(input, seg_id, n_segs, dim): def unsorted_1d_segment_sum(input, seg_id, n_segs, dim):
# TODO: support other dimensions # TODO: support other dimensions
assert dim == 0, 'MXNet only supports segment sum on first dimension' assert dim == 0, 'MXNet only supports segment sum on first dimension'
...@@ -246,3 +251,141 @@ def zerocopy_to_numpy(arr): ...@@ -246,3 +251,141 @@ def zerocopy_to_numpy(arr):
def zerocopy_from_numpy(np_data): def zerocopy_from_numpy(np_data):
# NOTE: not zerocopy # NOTE: not zerocopy
return nd.array(np_data, dtype=np_data.dtype) return nd.array(np_data, dtype=np_data.dtype)
def zerocopy_to_dgl_ndarray(arr):
return dglnd.from_dlpack(arr.to_dlpack_for_read())
def zerocopy_to_dgl_ndarray_for_write(arr):
return dglnd.from_dlpack(arr.to_dlpack_for_write())
def zerocopy_from_dgl_ndarray(arr):
return nd.from_dlpack(arr.to_dlpack())
class BinaryReduce(mx.autograd.Function):
def __init__(self, reducer, binary_op, graph, lhs, rhs, out_size, lhs_map,
rhs_map, out_map):
super(BinaryReduce, self).__init__()
self.reducer = reducer
self.binary_op = binary_op
self.graph = graph
self.lhs = lhs
self.rhs = rhs
self.out_size = out_size
self.lhs_map = lhs_map
self.rhs_map = rhs_map
self.out_map = out_map
def forward(self, lhs_data, rhs_data):
lhs_data_nd = zerocopy_to_dgl_ndarray(lhs_data)
rhs_data_nd = zerocopy_to_dgl_ndarray(rhs_data)
feat_shape = K.infer_binary_feature_shape(lhs_data_nd, rhs_data_nd)
out_data = nd.empty((self.out_size,) + feat_shape,
ctx=lhs_data.context, dtype=lhs_data.dtype)
out_data_nd = zerocopy_to_dgl_ndarray_for_write(out_data)
K.binary_op_reduce(
self.reducer, self.binary_op, self.graph, self.lhs, self.rhs,
lhs_data_nd, rhs_data_nd, out_data_nd, self.lhs_map[0],
self.rhs_map[0], self.out_map[0])
self.save_for_backward(lhs_data_nd, rhs_data_nd, out_data_nd,
feat_shape)
return out_data
def backward(self, grad_out):
lhs_data_nd, rhs_data_nd, out_data_nd, feat_shape = self.saved_tensors
grad_out_nd = zerocopy_to_dgl_ndarray(grad_out)
grad_lhs = nd.empty((lhs_data_nd.shape[0],) + feat_shape,
ctx=grad_out.context, dtype=grad_out.dtype)
K.backward_lhs_binary_op_reduce(
self.reducer, self.binary_op, self.graph, self.lhs, self.rhs,
lhs_data_nd, rhs_data_nd, out_data_nd, grad_out_nd,
zerocopy_to_dgl_ndarray_for_write(grad_lhs), self.lhs_map[1],
self.rhs_map[1], self.out_map[1])
grad_lhs = _reduce_grad(grad_lhs, lhs_data_nd.shape)
grad_rhs = nd.empty((rhs_data_nd.shape[0],) + feat_shape,
ctx=grad_out.context, dtype=grad_out.dtype)
K.backward_rhs_binary_op_reduce(
self.reducer, self.binary_op, self.graph, self.lhs, self.rhs,
lhs_data_nd, rhs_data_nd, out_data_nd, grad_out_nd,
zerocopy_to_dgl_ndarray_for_write(grad_rhs), self.lhs_map[1],
self.rhs_map[1], self.out_map[1])
grad_rhs = _reduce_grad(grad_rhs, rhs_data_nd.shape)
return grad_lhs, grad_rhs
def binary_reduce(reducer, binary_op, graph, lhs, rhs, lhs_data, rhs_data,
out_size, lhs_map, rhs_map, out_map):
func = BinaryReduce(reducer, binary_op, graph, lhs, rhs, out_size, lhs_map,
rhs_map, out_map)
return func(lhs_data, rhs_data)
class CopyReduce(mx.autograd.Function):
def __init__(self, reducer, graph, target, out_size, in_map, out_map):
super(CopyReduce, self).__init__()
self.reducer = reducer
self.graph = graph
self.target = target
self.out_size = out_size
self.in_map = in_map
self.out_map = out_map
def forward(self, in_data):
feat_shape = in_data.shape[1:]
out_data = nd.empty((self.out_size,) + feat_shape,
ctx=in_data.context, dtype=in_data.dtype)
in_data_nd = zerocopy_to_dgl_ndarray(in_data)
out_data_nd = zerocopy_to_dgl_ndarray_for_write(out_data)
K.copy_reduce(
self.reducer, self.graph, self.target, in_data_nd, out_data_nd,
self.in_map[0], self.out_map[0])
self.save_for_backward(in_data_nd, out_data_nd)
return out_data
def backward(self, grad_out):
in_data_nd, out_data_nd = self.saved_tensors
grad_out_nd = zerocopy_to_dgl_ndarray(grad_out)
grad_in = nd.empty(in_data_nd.shape, ctx=grad_out.context,
dtype=grad_out.dtype)
K.backward_copy_reduce(
self.reducer, self.graph, self.target, in_data_nd, out_data_nd,
grad_out_nd, zerocopy_to_dgl_ndarray_for_write(grad_in),
self.in_map[1], self.out_map[1])
return grad_in
def copy_reduce(reducer, graph, target, in_data, out_size, in_map, out_map):
func = CopyReduce(reducer, graph, target, out_size, in_map, out_map)
return func(in_data)
def _reduce_grad(grad, shape):
"""Reduce gradient on the broadcast dimension
If there is broadcast in forward pass, gradients need to be reduced on
broadcast dimension. This function checks the input tensor shape and
gradient shape and perform the reduction.
Parameters
----------
grad: Tensor
Gradient tensor
shape: tuple
Shape of input tensor
Returns
-------
Tensor
"""
grad_shape = grad.shape[1:]
in_shape = shape[1:]
if in_shape == grad_shape:
# no need to reduce
return grad
num_to_squeeze = len(grad_shape) - len(in_shape)
# pad in_shape
in_shape = (1,) * num_to_squeeze + in_shape
reduce_idx = np.nonzero(np.array(grad_shape) - np.array(in_shape))[0]
reduce_idx += 1 # skip batch dim
grad = grad.sum(axis=tuple(reduce_idx), keepdims=True)
return grad.reshape(shape)
...@@ -5,6 +5,9 @@ from distutils.version import LooseVersion ...@@ -5,6 +5,9 @@ from distutils.version import LooseVersion
import torch as th import torch as th
from torch.utils import dlpack from torch.utils import dlpack
from ... import ndarray as nd
from ... import kernel as K
TH_VERSION = LooseVersion(th.__version__) TH_VERSION = LooseVersion(th.__version__)
def data_type_dict(): def data_type_dict():
...@@ -31,23 +34,11 @@ def get_preferred_sparse_format(): ...@@ -31,23 +34,11 @@ def get_preferred_sparse_format():
""" """
return "coo" return "coo"
if TH_VERSION.version[0] == 0: def sparse_matrix(data, index, shape, force_format=False):
def sparse_matrix(data, index, shape, force_format=False):
fmt = index[0]
if fmt != 'coo':
raise TypeError('Pytorch backend only supports COO format. But got %s.' % fmt)
# NOTE: use _sparse_coo_tensor_unsafe to avoid unnecessary boundary check
spmat = th._sparse_coo_tensor_unsafe(index[1], data, shape)
# No conversion is required.
return spmat, None
else:
# VERSION 1.0+
def sparse_matrix(data, index, shape, force_format=False):
fmt = index[0] fmt = index[0]
if fmt != 'coo': if fmt != 'coo':
raise TypeError('Pytorch backend only supports COO format. But got %s.' % fmt) raise TypeError('Pytorch backend only supports COO format. But got %s.' % fmt)
spmat = th.sparse_coo_tensor(index[1], data, shape) spmat = th.sparse_coo_tensor(index[1], data, shape)
# No conversion is required.
return spmat, None return spmat, None
def sparse_matrix_indices(spmat): def sparse_matrix_indices(spmat):
...@@ -68,6 +59,15 @@ def ndim(input): ...@@ -68,6 +59,15 @@ def ndim(input):
def context(input): def context(input):
return input.device return input.device
def device_type(ctx):
return ctx.type
def device_id(ctx):
if ctx.index is None:
return 0
else:
return ctx.index
def astype(input, ty): def astype(input, ty):
return input.type(ty) return input.type(ty)
...@@ -135,18 +135,6 @@ def zeros_like(input): ...@@ -135,18 +135,6 @@ def zeros_like(input):
def ones(shape, dtype, ctx): def ones(shape, dtype, ctx):
return th.ones(shape, dtype=dtype, device=ctx) return th.ones(shape, dtype=dtype, device=ctx)
def spmm(x, y):
dst, src = x._indices()
# scatter index
index = dst.view(-1, 1).expand(-1, y.shape[1])
# zero tensor to be scatter_add to
out = y.new_full((x.shape[0], y.shape[1]), 0)
# look up src features and multiply by edge features
# Note: using y[src] instead of index_select will lead to terrible
# performance in backward
feature = th.index_select(y, 0, src) * x._values().unsqueeze(-1)
return out.scatter_add(0, index, feature)
def unsorted_1d_segment_sum(input, seg_id, n_segs, dim): def unsorted_1d_segment_sum(input, seg_id, n_segs, dim):
y = th.zeros(n_segs, *input.shape[1:]).to(input) y = th.zeros(n_segs, *input.shape[1:]).to(input)
seg_id = seg_id.view((-1,) + (1,) * (input.dim() - 1)).expand_as(input) seg_id = seg_id.view((-1,) + (1,) * (input.dim() - 1)).expand_as(input)
...@@ -201,3 +189,121 @@ def zerocopy_to_numpy(input): ...@@ -201,3 +189,121 @@ def zerocopy_to_numpy(input):
def zerocopy_from_numpy(np_array): def zerocopy_from_numpy(np_array):
return th.from_numpy(np_array) return th.from_numpy(np_array)
def zerocopy_to_dgl_ndarray(input):
return nd.from_dlpack(dlpack.to_dlpack(input.contiguous()))
def zerocopy_from_dgl_ndarray(input):
return dlpack.from_dlpack(input.to_dlpack())
class BinaryReduce(th.autograd.Function):
@staticmethod
def forward(ctx, reducer, binary_op, graph, lhs, rhs, lhs_data, rhs_data,
out_size, lhs_map, rhs_map, out_map):
lhs_data_nd = zerocopy_to_dgl_ndarray(lhs_data)
rhs_data_nd = zerocopy_to_dgl_ndarray(rhs_data)
feat_shape = K.infer_binary_feature_shape(lhs_data_nd, rhs_data_nd)
out_data = lhs_data.new_empty((out_size,) + feat_shape)
out_data_nd = zerocopy_to_dgl_ndarray(out_data)
K.binary_op_reduce(
reducer, binary_op, graph, lhs, rhs, lhs_data_nd, rhs_data_nd,
out_data_nd, lhs_map[0], rhs_map[0], out_map[0])
# save_for_backward can only save variables
ctx.backward_cache = (reducer, binary_op, graph, lhs, rhs, lhs_map,
rhs_map, out_map, lhs_data_nd, rhs_data_nd,
out_data_nd, feat_shape)
return out_data
@staticmethod
def backward(ctx, grad_out):
reducer, binary_op, graph, lhs, rhs, lhs_map, rhs_map, out_map, \
lhs_data_nd, rhs_data_nd, out_data_nd, feat_shape \
= ctx.backward_cache
ctx.backward_cache = None
grad_lhs = None
grad_rhs = None
grad_out_nd = zerocopy_to_dgl_ndarray(grad_out)
if ctx.needs_input_grad[5]:
grad_lhs = grad_out.new_empty((lhs_data_nd.shape[0],) + feat_shape)
K.backward_lhs_binary_op_reduce(
reducer, binary_op, graph, lhs, rhs, lhs_data_nd, rhs_data_nd,
out_data_nd, grad_out_nd, zerocopy_to_dgl_ndarray(grad_lhs),
lhs_map[1], rhs_map[1], out_map[1])
grad_lhs = _reduce_grad(grad_lhs, lhs_data_nd.shape)
if ctx.needs_input_grad[6]:
grad_rhs = grad_out.new_empty((rhs_data_nd.shape[0],) + feat_shape)
K.backward_rhs_binary_op_reduce(
reducer, binary_op, graph, lhs, rhs, lhs_data_nd, rhs_data_nd,
out_data_nd, grad_out_nd, zerocopy_to_dgl_ndarray(grad_rhs),
lhs_map[1], rhs_map[1], out_map[1])
grad_rhs = _reduce_grad(grad_rhs, rhs_data_nd.shape)
return None, None, None, None, None, grad_lhs, grad_rhs, None, None, \
None, None
class CopyReduce(th.autograd.Function):
@staticmethod
def forward(ctx, reducer, graph, target, in_data, out_size, in_map,
out_map):
out_data = in_data.new_empty((out_size,) + in_data.shape[1:])
in_data_nd = zerocopy_to_dgl_ndarray(in_data)
out_data_nd = zerocopy_to_dgl_ndarray(out_data)
K.copy_reduce(
reducer, graph, target, in_data_nd, out_data_nd, in_map[0],
out_map[0])
# save_for_backward can only save variables
ctx.backward_cache = (reducer, graph, target, in_map, out_map,
in_data_nd, out_data_nd)
return out_data
@staticmethod
def backward(ctx, grad_out):
reducer, graph, target, in_map, out_map, in_data_nd, out_data_nd \
= ctx.backward_cache
ctx.backward_cache = None
grad_in = None
grad_out_nd = zerocopy_to_dgl_ndarray(grad_out)
if ctx.needs_input_grad[3]:
grad_in = grad_out.new_empty(in_data_nd.shape)
K.backward_copy_reduce(
reducer, graph, target, in_data_nd, out_data_nd, grad_out_nd,
zerocopy_to_dgl_ndarray(grad_in), in_map[1], out_map[1])
return None, None, None, grad_in, None, None, None
binary_reduce = BinaryReduce.apply
copy_reduce = CopyReduce.apply
def _reduce_grad(grad, shape):
"""Reduce gradient on the broadcast dimension
If there is broadcast in forward pass, gradients need to be reduced on
broadcast dimension. This function checks the input tensor shape and
gradient shape and perform the reduction.
Parameters
----------
grad: Tensor
Gradient tensor
shape: tuple
Shape of input tensor
Returns
-------
Tensor
"""
grad_shape = grad.shape[1:]
in_shape = shape[1:]
if in_shape == grad_shape:
# no need to reduce
return grad
num_to_squeeze = len(grad_shape) - len(in_shape)
# pad inshape
in_shape = (1,) * num_to_squeeze + in_shape
reduce_idx = th.nonzero(th.tensor(grad_shape) - th.tensor(in_shape))
reduce_idx += 1 # skip batch dim
grad = grad.sum(dim=tuple(reduce_idx), keepdim=True)
return grad.view(shape)
"""Built-in function base class""" """Built-in function base class"""
from __future__ import absolute_import from __future__ import absolute_import
__all__ = ['BuiltinFunction', 'BundledFunction'] __all__ = ['BuiltinFunction', 'TargetCode']
class BuiltinFunction(object):
"""Base builtin function class."""
@property
def name(self):
"""Return the name of this builtin function."""
raise NotImplementedError
class BundledFunction(object): class TargetCode(object):
"""A utility class that bundles multiple functions. """Code for target
Parameters Note: must be consistent with the target code definition in C++ side:
---------- src/kernel/binary_reduce_common.h
fn_list : list of callable
The function list.
""" """
def __init__(self, fn_list): SRC = 0
self.fn_list = fn_list DST = 1
EDGE = 2
def __call__(self, *args, **kwargs): CODE2STR = {
"""Regular computation of this builtin function 0: "src",
1: "dst",
2: "edge",
}
This will be used when optimization is not available and should
ONLY be called by DGL framework.
"""
ret = {}
for fn in self.fn_list:
ret.update(fn(*args, **kwargs))
return ret
class BuiltinFunction(object):
"""Base builtin function class."""
@property @property
def name(self): def name(self):
"""Return the name.""" """Return the name of this builtin function."""
return "bundled" raise NotImplementedError
"""Built-in message function.""" """Built-in message function."""
from __future__ import absolute_import from __future__ import absolute_import
import operator import sys
from itertools import product
from .base import BuiltinFunction from .base import BuiltinFunction, TargetCode
from .. import backend as F from ..runtime import ir
from ..runtime.ir import var
__all__ = ["src_mul_edge", "copy_src", "copy_edge"]
__all__ = ["src_mul_edge", "copy_src", "copy_edge", "copy_u", "copy_e"]
class MessageFunction(BuiltinFunction): class MessageFunction(BuiltinFunction):
"""Base builtin message function class.""" """Base builtin message function class."""
def __call__(self, edges): def _invoke(self, graph, src_frame, dst_frame, edge_frame, out_size,
"""Regular computation of this builtin function src_map, dst_map, edge_map, out_map, reducer="none"):
"""Symbolic computation of this builtin function to create
This will be used when optimization is not available and should runtime.executor
ONLY be called by DGL framework.
""" """
raise NotImplementedError raise NotImplementedError
...@@ -25,195 +27,223 @@ class MessageFunction(BuiltinFunction): ...@@ -25,195 +27,223 @@ class MessageFunction(BuiltinFunction):
"""Return the name of this builtin function.""" """Return the name of this builtin function."""
raise NotImplementedError raise NotImplementedError
def is_spmv_supported(self, g):
"""Return whether the SPMV optimization is supported."""
raise NotImplementedError
@property
def use_edge_feature(self):
"""Return true if the message function uses edge feature data."""
raise NotImplementedError
def _is_spmv_supported_edge_feat(g, field):
"""Return whether the edge feature shape supports SPMV optimization.
Only scalar feature is supported currently.
"""
feat = g.get_e_repr()[field]
shape = F.shape(feat)
return len(shape) == 1 or (len(shape) == 2 and shape[1] == 1)
class SrcMulEdgeMessageFunction(MessageFunction): class BinaryMessageFunction(MessageFunction):
"""Class for the src_mul_edge builtin message function. """Class for the lhs_op_rhs builtin message function.
See Also See Also
-------- --------
src_mul_edge src_mul_edge
""" """
def __init__(self, mul_op, src_field, edge_field, out_field): def __init__(self, binary_op, lhs, rhs, lhs_field, rhs_field, out_field):
self.mul_op = mul_op self.binary_op = binary_op
self.src_field = src_field self.lhs = lhs
self.edge_field = edge_field self.rhs = rhs
self.lhs_field = lhs_field
self.rhs_field = rhs_field
self.out_field = out_field self.out_field = out_field
def is_spmv_supported(self, g): def _invoke(self, graph, src_frame, dst_frame, edge_frame, out_size,
"""Return true if this supports SPMV optimization. src_map, dst_map, edge_map, out_map, reducer="none"):
"""Symbolic computation of builtin binary message function to create
Parameters runtime.executor
----------
g : DGLGraph
The graph.
Returns
-------
bool
True if this supports SPMV optimization.
"""
return _is_spmv_supported_edge_feat(g, self.edge_field)
def __call__(self, edges):
"""Regular computation of this builtin function
This will be used when optimization is not available and should
ONLY be called by DGL framework.
""" """
sdata = edges.src[self.src_field] graph = var.GRAPH(graph)
edata = edges.data[self.edge_field] in_frames = (src_frame, dst_frame, edge_frame)
# Due to the different broadcasting semantics of different backends, in_maps = (src_map, dst_map, edge_map)
# we need to broadcast the sdata and edata to be of the same rank. lhs_data = ir.READ_COL(in_frames[self.lhs], var.STR(self.lhs_field))
rank = max(F.ndim(sdata), F.ndim(edata)) rhs_data = ir.READ_COL(in_frames[self.rhs], var.STR(self.rhs_field))
sshape = F.shape(sdata) lhs_map = var.MAP(in_maps[self.lhs])
eshape = F.shape(edata) rhs_map = var.MAP(in_maps[self.rhs])
sdata = F.reshape(sdata, sshape + (1,) * (rank - F.ndim(sdata))) out_map = var.MAP(out_map)
edata = F.reshape(edata, eshape + (1,) * (rank - F.ndim(edata))) return ir.BINARY_REDUCE(reducer, self.binary_op, graph, self.lhs,
ret = self.mul_op(sdata, edata) self.rhs, lhs_data, rhs_data, out_size,
return {self.out_field : ret} lhs_map, rhs_map, out_map)
@property @property
def name(self): def name(self):
return "src_mul_edge" lhs = TargetCode.CODE2STR[self.lhs]
rhs = TargetCode.CODE2STR[self.rhs]
return "{}_{}_{}".format(lhs, self.binary_op, rhs)
@property
def use_edge_feature(self):
"""Return true if the message function uses edge feature data."""
return True
class CopySrcMessageFunction(MessageFunction): class CopyMessageFunction(MessageFunction):
"""Class for the copy_src builtin message function. """Class for the copy builtin message function.
See Also See Also
-------- --------
copy_src copy_src
""" """
def __init__(self, src_field, out_field): def __init__(self, target, in_field, out_field):
self.src_field = src_field self.target = target
self.in_field = in_field
self.out_field = out_field self.out_field = out_field
def is_spmv_supported(self, g): def _invoke(self, graph, src_frame, dst_frame, edge_frame, out_size,
"""Return true if this supports SPMV optimization. src_map, dst_map, edge_map, out_map, reducer="none"):
"""Symbolic computation of builtin message function to create
runtime.executor
"""
graph = var.GRAPH(graph)
in_frames = (src_frame, dst_frame, edge_frame)
in_maps = (src_map, dst_map, edge_map)
in_data = ir.READ_COL(in_frames[self.target], var.STR(self.in_field))
in_map = var.MAP(in_maps[self.target])
out_map = var.MAP(out_map)
return ir.COPY_REDUCE(reducer, graph, self.target, in_data, out_size,
in_map, out_map)
@property
def name(self):
return "copy_{}".format(TargetCode.CODE2STR[self.target])
def copy_u(u, out):
"""Builtin message function that computes message using source node
feature.
Parameters Parameters
---------- ----------
g : DGLGraph u : str
The graph. The source feature field.
out : str
The output message field.
Returns Examples
------- --------
bool >>> import dgl
True if this supports SPMV optimization. >>> message_func = dgl.function.copy_u('h', 'm')
"""
return True
def __call__(self, edges): The above example is equivalent to the following user defined function:
"""Regular computation of this builtin function
This will be used when optimization is not available and should >>> def message_func(edges):
ONLY be called by DGL framework. >>> return {'m': edges.src['h']}
""" """
return {self.out_field : edges.src[self.src_field]} return CopyMessageFunction(TargetCode.SRC, u, out)
@property
def name(self):
return "copy_src"
@property def copy_e(e, out):
def use_edge_feature(self): """Builtin message function that computes message using edge feature.
"""Return true if the message function uses edge feature data."""
return False
class CopyEdgeMessageFunction(MessageFunction): Parameters
"""Class for the copy_edge builtin message function. ----------
e : str
The edge feature field.
out : str
The output message field.
See Also Examples
-------- --------
copy_edge >>> import dgl
>>> message_func = dgl.function.copy_e('h', 'm')
The above example is equivalent to the following user defined function:
>>> def message_func(edges):
>>> return {'m': edges.data['h']}
""" """
def __init__(self, edge_field=None, out_field=None): return CopyMessageFunction(TargetCode.EDGE, e, out)
self.edge_field = edge_field
self.out_field = out_field
###############################################################################
# Generate all following builtin message functions:
# u_add_v, u_sub_v, u_mul_v, u_div_v
# u_add_e, u_sub_e, u_mul_e, u_div_e
# v_add_u, v_sub_u, v_mul_u, v_div_u
# v_add_e, v_sub_e, v_mul_e, v_div_e
# e_add_u, e_sub_u, e_mul_u, e_div_u
# e_add_v, e_sub_v, e_mul_v, e_div_v
_TARGET_MAP = {
"u": TargetCode.SRC,
"v": TargetCode.DST,
"e": TargetCode.EDGE,
}
def is_spmv_supported(self, g):
"""Return true if this supports SPMV optimization. def _gen_message_builtin(lhs, rhs, binary_op):
name = "{}_{}_{}".format(lhs, binary_op, rhs)
docstring = """Builtin message function that computes message by performing
binary operation {} between {} feature and {} feature.
Parameters Parameters
---------- ----------
g : DGLGraph {} : str
The graph. The {} feature field.
{} : str
The {} feature field.
out : str
The output message field.
Returns Examples
------- --------
bool >>> import dgl
True if this supports SPMV optimization. >>> message_func = dgl.function.{}('h', 'h', 'm')
""" """.format(binary_op,
# TODO: support this with e2v spmv TargetCode.CODE2STR[_TARGET_MAP[lhs]],
return False TargetCode.CODE2STR[_TARGET_MAP[rhs]],
# return _is_spmv_supported_edge_feat(g, self.edge_field) lhs, TargetCode.CODE2STR[_TARGET_MAP[lhs]],
rhs, TargetCode.CODE2STR[_TARGET_MAP[rhs]],
name)
def __call__(self, edges): def func(lhs_field, rhs_field, out):
"""Regular computation of this builtin function return BinaryMessageFunction(
binary_op, _TARGET_MAP[lhs],
_TARGET_MAP[rhs], lhs_field, rhs_field, out)
func.__name__ = name
func.__doc__ = docstring
return func
This will be used when optimization is not available and should
ONLY be called by DGL framework.
"""
return {self.out_field : edges.data[self.edge_field]}
@property def _register_builtin_message_func():
def name(self): """Register builtin message functions"""
return "copy_edge" target = ["u", "v", "e"]
for lhs, rhs in product(target, target):
if lhs != rhs:
for binary_op in ["add", "sub", "mul", "div"]:
func = _gen_message_builtin(lhs, rhs, binary_op)
setattr(sys.modules[__name__], func.__name__, func)
__all__.append(func.__name__)
_register_builtin_message_func()
@property
def use_edge_feature(self): ##############################################################################
"""Return true if the message function uses edge feature data.""" # For backward compatibility
return True
def src_mul_edge(src, edge, out): def src_mul_edge(src, edge, out):
"""Builtin message function that computes message by multiplying source """Builtin message function that computes message by performing
node features with edge features. binary operation mul between src feature and dst feature.
Notes
-----
This function is deprecated. Please use u_mul_e instead.
Parameters Parameters
---------- ----------
src : str src : str
The source feature field. The source feature field.
edge : str dst : str
The edge feature field. The destination feature field.
out : str out : str
The output message field. The output message field.
Examples Examples
-------- --------
>>> import dgl >>> import dgl
>>> message_func = dgl.function.src_mul_edge(src='h', edge='w', out='m') >>> message_func = dgl.function.src_mul_edge('h', 'h', 'm')
The above example is equivalent to the following user defined function:
>>> def message_func(edges):
>>> return {'m': edges.src['h'] * edges.data['w']}
""" """
return SrcMulEdgeMessageFunction(operator.mul, src, edge, out) return getattr(sys.modules[__name__], "u_mul_e")(src, edge, out)
def copy_src(src, out): def copy_src(src, out):
"""Builtin message function that computes message using source node feature. """Builtin message function that computes message using source node
feature.
Notes
-----
This function is deprecated. Please use copy_u instead.
Parameters Parameters
---------- ----------
...@@ -225,18 +255,23 @@ def copy_src(src, out): ...@@ -225,18 +255,23 @@ def copy_src(src, out):
Examples Examples
-------- --------
>>> import dgl >>> import dgl
>>> message_func = dgl.function.copy_src(src='h', out='m') >>> message_func = dgl.function.copy_src('h', 'm')
The above example is equivalent to the following user defined function: The above example is equivalent to the following user defined function:
>>> def message_func(edges): >>> def message_func(edges):
>>> return {'m': edges.src['h']} >>> return {'m': edges.src['h']}
""" """
return CopySrcMessageFunction(src, out) return copy_u(src, out)
def copy_edge(edge, out): def copy_edge(edge, out):
"""Builtin message function that computes message using edge feature. """Builtin message function that computes message using edge feature.
Notes
-----
This function is deprecated. Please use copy_e instead.
Parameters Parameters
---------- ----------
edge : str edge : str
...@@ -247,11 +282,11 @@ def copy_edge(edge, out): ...@@ -247,11 +282,11 @@ def copy_edge(edge, out):
Examples Examples
-------- --------
>>> import dgl >>> import dgl
>>> message_func = dgl.function.copy_edge(edge='h', out='m') >>> message_func = dgl.function.copy_edge('h', 'm')
The above example is equivalent to the following user defined function: The above example is equivalent to the following user defined function:
>>> def message_func(edges): >>> def message_func(edges):
>>> return {'m': edges.data['h']} >>> return {'m': edges.data['h']}
""" """
return CopyEdgeMessageFunction(edge, out) return copy_e(edge, out)
...@@ -2,19 +2,20 @@ ...@@ -2,19 +2,20 @@
# pylint: disable=redefined-builtin # pylint: disable=redefined-builtin
from __future__ import absolute_import from __future__ import absolute_import
from .. import backend as F import sys
from .base import BuiltinFunction
from .base import BuiltinFunction, TargetCode
from ..runtime import ir
from ..runtime.ir import var
__all__ = ["sum", "max"]
class ReduceFunction(BuiltinFunction): class ReduceFunction(BuiltinFunction):
"""Base builtin reduce function class.""" """Base builtin reduce function class."""
def __call__(self, nodes): def _invoke(self, graph, edge_frame, out_size, edge_map=None,
"""Regular computation of this builtin function out_map=None):
"""Symbolic computation of this builtin function to create
This will be used when optimization is not available and should runtime.executor
ONLY be called by DGL framework.
""" """
raise NotImplementedError raise NotImplementedError
...@@ -23,34 +24,37 @@ class ReduceFunction(BuiltinFunction): ...@@ -23,34 +24,37 @@ class ReduceFunction(BuiltinFunction):
"""Return the name of this builtin function.""" """Return the name of this builtin function."""
raise NotImplementedError raise NotImplementedError
def is_spmv_supported(self):
"""Return whether the SPMV optimization is supported."""
raise NotImplementedError
class SimpleReduceFunction(ReduceFunction): class SimpleReduceFunction(ReduceFunction):
"""Builtin reduce function that aggregates a single field into another """Builtin reduce function that aggregates a single field into another
single field.""" single field."""
def __init__(self, name, reduce_op, msg_field, out_field): def __init__(self, name, msg_field, out_field):
self._name = name self._name = name
self.reduce_op = reduce_op
self.msg_field = msg_field self.msg_field = msg_field
self.out_field = out_field self.out_field = out_field
def is_spmv_supported(self): def _invoke(self, graph, edge_frame, out_size, edge_map=None,
"""Return whether the SPMV optimization is supported.""" out_map=None):
# NOTE: only sum is supported right now. """Symbolic execution of this builtin function"""
return self._name == "sum" reducer = self._name
graph = var.GRAPH(graph)
def __call__(self, nodes): edge_map = var.MAP(edge_map)
return {self.out_field : self.reduce_op(nodes.mailbox[self.msg_field], 1)} out_map = var.MAP(out_map)
edge_data = ir.READ_COL(edge_frame, var.STR(self.msg_field))
return ir.COPY_REDUCE(reducer, graph, TargetCode.EDGE, edge_data,
out_size, edge_map, out_map)
@property @property
def name(self): def name(self):
return self._name return self._name
def sum(msg, out):
"""Builtin reduce function that aggregates messages by sum. ###############################################################################
# Generate all following reducer functions:
# sum, max, min, prod
def _gen_reduce_builtin(reducer):
docstring = """Builtin reduce function that aggregates messages by {0}.
Parameters Parameters
---------- ----------
...@@ -61,37 +65,32 @@ def sum(msg, out): ...@@ -61,37 +65,32 @@ def sum(msg, out):
Examples Examples
-------- --------
>>> import dgl >>> import dgl
>>> reduce_func = dgl.function.sum(msg='m', out='h') >>> reduce_func = dgl.function.{0}('m', 'h')
The above example is equivalent to the following user defined function The above example is equivalent to the following user defined function
(if using PyTorch): (if using PyTorch):
>>> import torch >>> import torch
>>> def reduce_func(nodes): >>> def reduce_func(nodes):
>>> return {'h': torch.sum(nodes.mailbox['m'], dim=1)} >>> return {{'h': torch.{0}(nodes.mailbox['m'], dim=1)}}
""" """.format(reducer)
return SimpleReduceFunction("sum", F.sum, msg, out)
def max(msg, out): def func(msg, out):
"""Builtin reduce function that aggregates messages by max. return SimpleReduceFunction(reducer, msg, out)
func.__name__ = reducer
func.__doc__ = docstring
return func
Parameters
----------
msg : str
The message field.
out : str
The output node feature field.
Examples __all__ = []
--------
>>> import dgl
>>> reduce_func = dgl.function.max(msg='m', out='h')
The above example is equivalent to the following user defined function
(if using PyTorch):
>>> import torch def _register_builtin_reduce_func():
>>> def reduce_func(nodes): """Register builtin reduce functions"""
>>> return {'h': torch.max(nodes.mailbox['m'], dim=1)[0]} for reduce_op in ["max", "min", "sum", "prod"]:
""" builtin = _gen_reduce_builtin(reduce_op)
return SimpleReduceFunction("max", F.max, msg, out) setattr(sys.modules[__name__], reduce_op, builtin)
__all__.append(reduce_op)
_register_builtin_reduce_func()
...@@ -3048,7 +3048,7 @@ class DGLGraph(DGLBaseGraph): ...@@ -3048,7 +3048,7 @@ class DGLGraph(DGLBaseGraph):
n_repr = self.get_n_repr(v) n_repr = self.get_n_repr(v)
nbatch = NodeBatch(self, v, n_repr) nbatch = NodeBatch(self, v, n_repr)
n_mask = predicate(nbatch) n_mask = F.copy_to(predicate(nbatch), F.cpu())
if is_all(nodes): if is_all(nodes):
return F.nonzero_1d(n_mask) return F.nonzero_1d(n_mask)
...@@ -3121,7 +3121,7 @@ class DGLGraph(DGLBaseGraph): ...@@ -3121,7 +3121,7 @@ class DGLGraph(DGLBaseGraph):
edge_data = self.get_e_repr(eid) edge_data = self.get_e_repr(eid)
dst_data = self.get_n_repr(v) dst_data = self.get_n_repr(v)
ebatch = EdgeBatch(self, (u, v, eid), src_data, edge_data, dst_data) ebatch = EdgeBatch(self, (u, v, eid), src_data, edge_data, dst_data)
e_mask = predicate(ebatch) e_mask = F.copy_to(predicate(ebatch), F.cpu())
if is_all(edges): if is_all(edges):
return F.nonzero_1d(e_mask) return F.nonzero_1d(e_mask)
......
...@@ -427,16 +427,16 @@ class GraphIndex(object): ...@@ -427,16 +427,16 @@ class GraphIndex(object):
utils.Index utils.Index
The edge ids. The edge ids.
""" """
key = 'edges_s%s' % order
if key not in self._cache:
if order is None: if order is None:
order = "" order = ""
edge_array = _CAPI_DGLGraphEdges(self._handle, order) edge_array = _CAPI_DGLGraphEdges(self._handle, order)
src = utils.toindex(edge_array(0)) src = edge_array(0)
dst = utils.toindex(edge_array(1)) dst = edge_array(1)
eid = utils.toindex(edge_array(2)) eid = edge_array(2)
self._cache[key] = (src, dst, eid) src = utils.toindex(src)
return self._cache[key] dst = utils.toindex(dst)
eid = utils.toindex(eid)
return src, dst, eid
def in_degree(self, v): def in_degree(self, v):
"""Return the in degree of the node. """Return the in degree of the node.
...@@ -598,8 +598,38 @@ class GraphIndex(object): ...@@ -598,8 +598,38 @@ class GraphIndex(object):
else: else:
raise Exception("unknown format") raise Exception("unknown format")
@utils.cached_member(cache='_cache', prefix='immu_gidx')
def get_immutable_gidx(self, ctx):
"""Create an immutable graph index and copy to the given device context.
Note: this internal function is for DGL scheduler use only
Parameters
----------
ctx : DGLContext
The context of the returned graph.
Returns
-------
GraphIndex
"""
return self.to_immutable().asbits(self.bits_needed()).copy_to(ctx)
def get_csr_shuffle_order(self):
"""Return the edge shuffling order when a coo graph is converted to csr format
Returns
-------
tuple of two utils.Index
The first element of the tuple is the shuffle order for outward graph
The second element of the tuple is the shuffle order for inward graph
"""
csr = _CAPI_DGLGraphGetAdj(self._handle, True, "csr")
order = csr(2)
rev_csr = _CAPI_DGLGraphGetAdj(self._handle, False, "csr")
rev_order = rev_csr(2)
return utils.toindex(order), utils.toindex(rev_order)
@utils.cached_member(cache='_cache', prefix='adj')
def adjacency_matrix(self, transpose, ctx): def adjacency_matrix(self, transpose, ctx):
"""Return the adjacency matrix representation of this graph. """Return the adjacency matrix representation of this graph.
...@@ -650,7 +680,6 @@ class GraphIndex(object): ...@@ -650,7 +680,6 @@ class GraphIndex(object):
else: else:
raise Exception("unknown format") raise Exception("unknown format")
@utils.cached_member(cache='_cache', prefix='inc')
def incidence_matrix(self, typestr, ctx): def incidence_matrix(self, typestr, ctx):
"""Return the incidence matrix representation of this graph. """Return the incidence matrix representation of this graph.
...@@ -761,6 +790,86 @@ class GraphIndex(object): ...@@ -761,6 +790,86 @@ class GraphIndex(object):
handle = _CAPI_DGLGraphLineGraph(self._handle, backtracking) handle = _CAPI_DGLGraphLineGraph(self._handle, backtracking)
return GraphIndex(handle) return GraphIndex(handle)
def to_immutable(self):
"""Convert this graph index to an immutable one.
Returns
-------
GraphIndex
An immutable graph index.
"""
handle = _CAPI_DGLToImmutable(self._handle)
return GraphIndex(handle)
def ctx(self):
"""Return the context of this graph index.
Returns
-------
DGLContext
The context of the graph.
"""
return _CAPI_DGLGraphContext(self._handle)
def copy_to(self, ctx):
"""Copy this immutable graph index to the given device context.
NOTE: this method only works for immutable graph index
Parameters
----------
ctx : DGLContext
The target device context.
Returns
-------
GraphIndex
The graph index on the given device context.
"""
handle = _CAPI_DGLImmutableGraphCopyTo(self._handle, ctx.device_type, ctx.device_id)
return GraphIndex(handle)
def nbits(self):
"""Return the number of integer bits used in the storage (32 or 64).
Returns
-------
int
The number of bits.
"""
return _CAPI_DGLGraphNumBits(self._handle)
def bits_needed(self):
"""Return the number of integer bits needed to represent the graph
Returns
-------
int
The number of bits needed
"""
if self.number_of_edges() >= 0x80000000 or self.number_of_nodes() >= 0x80000000:
return 64
else:
return 32
def asbits(self, bits):
"""Transform the graph to a new one with the given number of bits storage.
NOTE: this method only works for immutable graph index
Parameters
----------
bits : int
The number of integer bits (32 or 64)
Returns
-------
GraphIndex
The graph index stored using the given number of bits.
"""
handle = _CAPI_DGLImmutableGraphAsNumBits(self._handle, int(bits))
return GraphIndex(handle)
class SubgraphIndex(GraphIndex): class SubgraphIndex(GraphIndex):
"""Graph index for subgraph. """Graph index for subgraph.
......
"""Module for dgl kernels for graph computation."""
from __future__ import absolute_import
from ._ffi.function import _init_api
from .ndarray import empty
def infer_binary_feature_shape(lhs, rhs):
"""Infer the output feature shape after a binary operation between lhs and rhs.
Parameter
---------
lhs : dgl.ndarray.NDArray
The lhs tensor.
rhs : dgl.ndarray.NDArray
The rhs tensor.
Returns
-------
tuple of int
The output feature shape.
"""
ret = _CAPI_DGLKernelInferBinaryFeatureShape(lhs, rhs)
return tuple(ret.asnumpy())
# pylint: disable=invalid-name
def binary_op_reduce(reducer, op, G, A_target, B_target, A, B, out,
A_rows=None, B_rows=None, out_rows=None):
"""Perform binary operation on the edges of graph ``G``, and optionally
reduce the per-edge result by edge destinations into per-node result.
Details
-------
Concretely, this function could be decomposed into two steps:
1. Perform binary operations on each edge (u, v, e) on graph ``G`` as
follows,::
C[e] = A[select_A_target(u, v, e)] op B[select_B_target(u, v, e)]
where
* ``select_A_target`` and ``select_B_target`` would return the source
node ID, destination node ID, or edge ID, according to ``A_target``
and ``B_target`` which could take either
- "source" (0),
- "destination" (1), or
- "edge" (2).
* ``A`` and ``B`` are data tensors. If ``A_target`` is "edge", then
``A.shape[0]`` should equal the number of edges of ``G``. Otherwise
that should equal the number of nodes of ``G``. Similar constraints
apply for ``B``.
* ``op`` could be either of the following strings: "add", "mul", "sub",
"div".
2. Perform the optional reduction step on ``C`` computed previously.
* If ``reducer`` is None, then no reduction is performed and we return
the per-edge result ``C`` directly,::
out[e] = C[e]
* Otherwise, the per-edge result ``C`` is reduced into per-node result
according to edge destinations, in a similar fashion as
``unsorted_segment_XXX`` in Tensorflow or ``scatter_XXX`` in PyTorch
or PyTorch-Scatter. For all ``v`` that has incoming edges,::
out[v] = reducer_{e: (u, v, e) in G} C[e]
Broadcasting
------------
Broadcasting is supported on the feature dimensions, following numpy
semantics.
Examples::
A.shape = (N, D1, D2) # N is the number of nodes
B.shape = (M, D1, 1) # M is the number of edges
C = BinaryOpReduce("sum", "add", graph, A, B, ...)
C.shape = (N, D1, D2)
Partial reads/writes
--------------------
Optionally, one can provide which rows to read from ``A`` and ``B`` with
``A_rows`` and ``B_rows``, both of which are 1D integer arrays. Similarly,
one can provide which rows to write to ``out`` with ``out_rows``, which is
again a 1D integer array. Concretely,
* Instead of from ``A`` and ``B``, ``C`` would be computed from
``A[A_rows]`` and ``B[B_rows]``. This implies that
* ``A`` and ``B`` no longer need to have the same number of rows as
the number of nodes or edges in ``G``. However, ``A_rows`` and
``B_rows`` must have the same number of elements as the number of
nodes or edges in ``G``.
* Instead of directly writing to ``out``, it will selectively write some
rows of ``C`` or reduced ``C``,::
out[out_rows[i]] = C[i] if out_rows[i] != -1
Or
out[out_rows[i]] = reducer_{e: (u, v, e) in G} C[e]
Parameters
----------
reducer : str
The type of the reducer ("sum", "max", "min", "mean", "prod", "none").
If the reducer is "none", the output is an edge feature tensor.
Otherwise, a node feature tensor is returned.
op : str
The type of the binary functor ("add", "mul", "sub", "div").
G : GraphIndex
The graph
A_target : int
Choice of source, destination, or edge ID for edges on left operand
B_target : int
Choice of source, destination, or edge ID for edges on right operand
A : NDArray
Data tensor of left operand
B : NDArray
Data tensor of right operand
out : NDArray (output)
Output tensor. The result will be written there in place.
A_rows : NDArray, optional
The rows to read from A.
B_rows : NDArray, optional
The rows to read from B.
out_rows : NDArray
The rows to write to output tensor.
"""
if A_rows is None:
A_rows = empty([])
if B_rows is None:
B_rows = empty([])
if out_rows is None:
out_rows = empty([])
_CAPI_DGLKernelBinaryOpReduce(
reducer, op, G._handle,
int(A_target), int(B_target),
A, B, out,
A_rows, B_rows, out_rows)
# pylint: disable=invalid-name
def backward_lhs_binary_op_reduce(
reducer, op, G,
A_target, B_target,
A, B, out,
grad_out, grad_A,
A_rows=None, B_rows=None, out_rows=None):
"""Compute the gradient of ``binary_op_reduce`` w.r.t. ``A`` and store it
in ``grad_A``.
See ``binary_op_reduce`` for forward propagation and partial reads/writes.
Gradient of broadcasted tensors
-------------------------------
``grad_A`` is assumed to be unbroadcasted, i.e. the shape of ``grad_A``
is the same as ``grad_out`` except the first axis.
If broadcasting happened in forward propagation, one needs to manually
sum the gradients along the broadcasted dimension to yield the correct
gradient.
Parameter
---------
reducer : str
The type of the reducer ("sum", "max", "min", "mean", "prod", "none").
If the reducer is "none", the output is an edge feature tensor.
Otherwise, a node feature tensor is returned.
op : str
The type of the binary functor ("add", "mul", "sub", "div").
G : GraphIndex
The graph
A_target : int
Choice of source, destination, or edge ID for edges on left operand
B_target : int
Choice of source, destination, or edge ID for edges on right operand
A : NDArray
Data tensor of left operand
B : NDArray
Data tensor of right operand
out : NDArray
Output tensor computed in the forward pass.
grad_out : NDArray
Gradient w.r.t. ``out``.
grad_A : NDArray (output)
Gradient w.r.t. ``A``. The result will be written there in place.
A_rows : NDArray, optional
The rows read from A.
B_rows : NDArray, optional
The rows read from B.
out_rows : NDArray
The rows written to output tensor.
"""
if A_rows is None:
A_rows = empty([])
if B_rows is None:
B_rows = empty([])
if out_rows is None:
out_rows = empty([])
_CAPI_DGLKernelBackwardLhsBinaryOpReduce(
reducer, op, G._handle,
int(A_target), int(B_target),
A_rows, B_rows, out_rows,
A, B, out,
grad_out, grad_A)
# pylint: disable=invalid-name
def backward_rhs_binary_op_reduce(
reducer, op, G,
A_target, B_target,
A, B, out,
grad_out, grad_B,
A_rows=None, B_rows=None, out_rows=None):
"""Compute the gradient of ``binary_op_reduce`` w.r.t. ``B`` and store it
in ``grad_B``.
See ``binary_op_reduce`` for forward propagation and partial reads/writes.
Gradient of broadcasted tensors
-------------------------------
``grad_B`` is assumed to be unbroadcasted, i.e. the shape of ``grad_B``
is the same as ``grad_out`` except the first axis.
If broadcasting happened in forward propagation, one needs to manually
sum the gradients along the broadcasted dimension to yield the correct
gradient.
Parameter
---------
reducer : str
The type of the reducer ("sum", "max", "min", "mean", "prod", "none").
If the reducer is "none", the output is an edge feature tensor.
Otherwise, a node feature tensor is returned.
op : str
The type of the binary functor ("add", "mul", "sub", "div").
G : GraphIndex
The graph
A_target : int
Choice of source, destination, or edge ID for edges on left operand
B_target : int
Choice of source, destination, or edge ID for edges on right operand
A : NDArray
Data tensor of left operand
B : NDArray
Data tensor of right operand
out : NDArray
Output tensor computed in the forward pass.
grad_out : NDArray
Gradient w.r.t. ``out``.
grad_B : NDArray (output)
Gradient w.r.t. ``B``. The result will be written there in place.
A_rows : NDArray, optional
The rows read from A.
B_rows : NDArray, optional
The rows read from B.
out_rows : NDArray
The rows written to output tensor.
"""
if A_rows is None:
A_rows = empty([])
if B_rows is None:
B_rows = empty([])
if out_rows is None:
out_rows = empty([])
_CAPI_DGLKernelBackwardRhsBinaryOpReduce(
reducer, op, G._handle,
int(A_target), int(B_target),
A_rows, B_rows, out_rows,
A, B, out,
grad_out, grad_B)
# pylint: disable=invalid-name
def copy_reduce(reducer, G, target,
X, out,
X_rows=None, out_rows=None):
"""Copy data in ``X`` according to source/destination/edge ID onto the
edges of graph ``G``, and optionally reduce the per-edge result by edge
destinations into per-node result.
Details
-------
Concretely, this function could be decomposed into two steps:
1. For each edge (u, v, e) on graph ``G``, set
C[e] = X[select_target(u, v, e)]
where
* ``select_target`` would return the source node ID, destination node,
ID, or edge ID, according to ``target`` which could take either
- "source" (0),
- "destination" (1), or
- "edge" (2)
* ``X`` is a data tensor. If ``target`` is "edge", then ``X.shape[0]``
should equal the number of edges of ``G``. Otherwise that should
equal the number of nodes of ``G``.
2. Perform the optional reduction step on ``C`` computed previously.
* If ``reducer`` is None, then no reduction is performed and we return
the per-edge result ``C`` directly,::
out[e] = C[e]
* Otherwise, the per-edge result ``C`` is reduced into per-node result
according to edge destinations, in a similar fashion as
``unsorted_segment_XXX`` in Tensorflow or ``scatter_XXX`` in PyTorch
or PyTorch-Scatter. For all ``v`` that has incoming edges,::
out[v] = reducer_{e: (u, v, e) in G} C[e]
Partial reads/writes
--------------------
Optionally, one can provide which rows to read from ``X`` with ``X_rows``,
which is a 1D integer array. Similarly, one can provide which rows to
write to ``out`` with ``out_rows``, which is again a 1D integer array.
Concretely,
* Instead of from ``X``, ``C`` would be copied from ``X[X_rows]``. This
implies that
* ``X`` no longer needs to have the same number of rows as the number of
nodes or edges in ``G``. However, ``X_rows`` must have the same
number of elements as the number of nodes or edges in ``G``.
* Instead of directly writing to ``out``, it will selectively write some
rows of ``C`` or reduced ``C``,::
out[out_rows[i]] = C[i] if out_rows[i] != -1
Or
out[out_rows[i]] = reducer_{e: (u, v, e) in G} C[e]
Parameter
---------
reducer : str
The type of the reducer ("sum", "max", "min", "mean", "prod", "none").
If the reducer is "none", the output is an edge feature tensor.
Otherwise, a node feature tensor is returned.
graph : GraphIndex
The graph
target : int
Choice of source, destination, or edge ID for edges to index in data
tensor.
X : NDArray
Data tensor.
out : NDArray (output)
Output tensor. The result will be written there in place.
X_rows : NDArray, optional
The rows to read from X.
out_mapping : NDArray
The rows to write to output tensor.
"""
if X_rows is None:
X_rows = empty([])
if out_rows is None:
out_rows = empty([])
_CAPI_DGLKernelCopyReduce(
reducer, G._handle, int(target),
X, out, X_rows, out_rows)
# pylint: disable=invalid-name
def backward_copy_reduce(reducer, G, target,
X, out,
grad_out, grad_X,
X_rows=None, out_rows=None):
"""Compute the gradient of ``copy_reduce`` w.r.t. ``X`` and store it in
``grad_X``.
See ``copy_reduce`` for forward propagation and partial reads/writes.
Parameter
---------
reducer : str
The type of the reducer ("sum", "max", "min", "mean", "prod", "none").
If the reducer is "none", the output is an edge feature tensor.
Otherwise, a node feature tensor is returned.
G : GraphIndex
The graph
target : int
Choice of source, destination, or edge ID for edges to index in data
tensor.
X : NDArray
Data tensor.
out : NDArray
Output tensor computed in the forward pass.
grad_out_data : NDArray
Gradient w.r.t. ``out``.
grad_X : NDArray (output)
Gradient w.r.t. ``X``. The result will be written there in place.
X_rows : NDArray, optional
The rows read from X.
out_rows : NDArray
The rows written to output tensor.
"""
if X_rows is None:
X_rows = empty([])
if out_rows is None:
out_rows = empty([])
_CAPI_DGLKernelBackwardCopyReduce(
reducer, G._handle, int(target),
X, out, grad_out, grad_X,
X_rows, out_rows)
_init_api("dgl.kernel")
"""Torch modules for graph related softmax.""" """Torch modules for graph related softmax."""
# pylint: disable= no-member, arguments-differ # pylint: disable= no-member, arguments-differ
import torch as th import torch as th
from torch import nn
from ... import backend as F
from ... import utils
from ... import function as fn from ... import function as fn
from ...utils import get_ndata_name
__all__ = ['EdgeSoftmax'] __all__ = ['EdgeSoftmax', 'edge_softmax']
class EdgeSoftmax(nn.Module):
class EdgeSoftmax(object):
r"""Apply softmax over signals of incoming edges. r"""Apply softmax over signals of incoming edges.
For a node :math:`i`, edgesoftmax is an operation of computing For a node :math:`i`, edgesoftmax is an operation of computing
...@@ -24,22 +25,16 @@ class EdgeSoftmax(nn.Module): ...@@ -24,22 +25,16 @@ class EdgeSoftmax(nn.Module):
`Graph Attention Network <https://arxiv.org/pdf/1710.10903.pdf>`__ where `Graph Attention Network <https://arxiv.org/pdf/1710.10903.pdf>`__ where
the attention weights are computed with such an edgesoftmax operation. the attention weights are computed with such an edgesoftmax operation.
""" """
def __init__(self):
super(EdgeSoftmax, self).__init__() def __call__(self, graph, logits):
# compute the softmax
self._logits_name = "_logits"
self._max_logits_name = "_max_logits"
self._normalizer_name = "_norm"
def forward(self, logits, graph):
r"""Compute edge softmax. r"""Compute edge softmax.
Parameters Parameters
---------- ----------
graph : DGLGraph
The graph to perform edge softmax
logits : torch.Tensor logits : torch.Tensor
The input edge feature The input edge feature
graph : DGLGraph
The graph.
Returns Returns
------- -------
...@@ -50,46 +45,89 @@ class EdgeSoftmax(nn.Module): ...@@ -50,46 +45,89 @@ class EdgeSoftmax(nn.Module):
Notes Notes
----- -----
* Input shape: :math:`(N, *, 1)` where * means any number of additional * Input shape: :math:`(N, *, 1)` where * means any number of
dimensions, :math:`N` is the number of edges. additional dimensions, :math:`N` is the number of edges.
* Unnormalized scores shape: :math:`(N, *, 1)` where all but the last * Unnormalized scores shape: :math:`(N, *, 1)` where all but the
dimension are the same shape as the input. last dimension are the same shape as the input.
* Normalizer shape: :math:`(M, *, 1)` where :math:`M` is the number of * Normalizer shape: :math:`(M, *, 1)` where :math:`M` is the number
nodes and all but the first and the last dimensions are the same as of nodes and all but the first and the last dimensions are the
the input. same as the input.
Note that this computation is still one step away from getting real softmax Note that this computation is still one step away from getting real
results. The last step can be proceeded as follows: softmax results. The last step can be proceeded as follows:
>>> import dgl.function as fn >>> import dgl.function as fn
>>> >>> scores, normalizer = EdgeSoftmax(logits, graph)
>>> scores, normalizer = EdgeSoftmax(...).forward(logits, graph)
>>> graph.edata['a'] = scores >>> graph.edata['a'] = scores
>>> graph.ndata['normalizer'] = normalizer >>> graph.ndata['normalizer'] = normalizer
>>> graph.apply_edges(lambda edges : {'a' : edges.data['a'] / edges.dst['normalizer']}) >>> graph.apply_edges(
lambda edges: {'a': edges.data['a'] / edges.dst['normalizer']})
We left this last step to users as depending on the particular use case, We left this last step to users as depending on the particular use
this step can be combined with other computation at once. case, this step can be combined with other computation at once.
"""
num_nodes = graph.number_of_nodes()
ctx = utils.to_dgl_context(F.context(logits))
gidx = graph._graph.get_immutable_gidx(ctx)
_, dst, _ = graph._graph.edges()
dst = dst.tousertensor(F.context(logits))
empty_map = (None, None)
max_logits_ = F.copy_reduce("max", gidx, fn.TargetCode.EDGE, logits,
num_nodes, empty_map, empty_map)
logits = (logits - max_logits_.index_select(0, dst)).exp()
norm = F.copy_reduce("sum", gidx, fn.TargetCode.EDGE, logits,
num_nodes, empty_map, empty_map)
return logits / norm.index_select(0, dst)
class EdgeSoftmax1(th.autograd.Function):
"""EdgeSoftmax implementation with DGL message passing APIs"""
@staticmethod
def forward(ctx, g, score):
"""
score = dgl.EData(g, score)
score_max = score.dst_max() # of type dgl.NData
score = score - score_max # edge_sub_dst, ret dgl.EData
score_sum = score.dst_sum() # of type dgl.NData
out = score / score_sum # edge_div_dst, ret dgl.EData
return out.data
"""
score_name = utils.get_edata_name(g, 'score')
tmp_name = utils.get_ndata_name(g, 'tmp')
out_name = utils.get_edata_name(g, 'out')
g.edata[score_name] = score
g.update_all(fn.copy_e(score_name, 'm'), fn.max('m', tmp_name))
g.apply_edges(fn.e_sub_v(score_name, tmp_name, out_name))
g.edata[out_name] = th.exp(g.edata[out_name])
g.update_all(fn.copy_e(out_name, 'm'), fn.sum('m', tmp_name))
g.apply_edges(fn.e_div_v(out_name, tmp_name, out_name))
g.edata.pop(score_name)
g.ndata.pop(tmp_name)
out = g.edata.pop(out_name)
ctx.backward_cache = (g, out)
return out
@staticmethod
def backward(ctx, grad_out):
"""
g, out = ctx.backward_cache
grad_out = dgl.EData(g, grad_out)
out = dgl.EData(g, out)
sds = out * grad_out # type dgl.EData
sds_sum = sds.dst_sum() # type dgl.NData
grad_score = sds - sds * sds_sum # multiple expressions
return grad_score.data
""" """
self._logits_name = get_ndata_name(graph, self._logits_name) g, out = ctx.backward_cache
self._max_logits_name = get_ndata_name(graph, self._max_logits_name) out_name = utils.get_edata_name(g, 'out')
self._normalizer_name = get_ndata_name(graph, self._normalizer_name) accum_name = utils.get_ndata_name(g, 'accum')
grad_score_name = utils.get_edata_name(g, 'grad_score')
graph.edata[self._logits_name] = logits g.edata[out_name] = out
g.edata[grad_score_name] = out * grad_out
# compute the softmax g.update_all(fn.copy_e(grad_score_name, 'm'), fn.sum('m', accum_name))
graph.update_all(fn.copy_edge(self._logits_name, self._logits_name), g.apply_edges(fn.e_mul_v(out_name, accum_name, out_name))
fn.max(self._logits_name, self._max_logits_name)) grad_score = g.edata[grad_score_name] - g.edata[out_name]
# minus the max and exp return None, grad_score
graph.apply_edges(
lambda edges: {self._logits_name : th.exp(edges.data[self._logits_name] -
edges.dst[self._max_logits_name])}) edge_softmax = EdgeSoftmax1.apply # pylint: disable=invalid-name
# pop out temporary feature _max_logits, otherwise get_ndata_name could have huge overhead
graph.ndata.pop(self._max_logits_name)
# compute normalizer
graph.update_all(fn.copy_edge(self._logits_name, self._logits_name),
fn.sum(self._logits_name, self._normalizer_name))
return graph.edata.pop(self._logits_name), graph.ndata.pop(self._normalizer_name)
def __repr__(self):
return 'EdgeSoftmax()'
...@@ -152,7 +152,7 @@ class NodeFlow(DGLBaseGraph): ...@@ -152,7 +152,7 @@ class NodeFlow(DGLBaseGraph):
block_id = self._get_block_id(block_id) block_id = self._get_block_id(block_id)
return int(self._block_offsets[block_id + 1]) - int(self._block_offsets[block_id]) return int(self._block_offsets[block_id + 1]) - int(self._block_offsets[block_id])
def copy_from_parent(self, node_embed_names=ALL, edge_embed_names=ALL, ctx=F.cpu()): def copy_from_parent(self, node_embed_names=ALL, edge_embed_names=ALL, ctx=None):
"""Copy node/edge features from the parent graph. """Copy node/edge features from the parent graph.
Parameters Parameters
...@@ -161,6 +161,8 @@ class NodeFlow(DGLBaseGraph): ...@@ -161,6 +161,8 @@ class NodeFlow(DGLBaseGraph):
The names of embeddings in each layer. The names of embeddings in each layer.
edge_embed_names : a list of lists of strings, optional edge_embed_names : a list of lists of strings, optional
The names of embeddings in each block. The names of embeddings in each block.
ctx : Context
The device to copy tensor to. If None, features will stay at its original device
""" """
if self._parent._node_frame.num_rows != 0 and self._parent._node_frame.num_columns != 0: if self._parent._node_frame.num_rows != 0 and self._parent._node_frame.num_columns != 0:
if is_all(node_embed_names): if is_all(node_embed_names):
...@@ -244,7 +246,8 @@ class NodeFlow(DGLBaseGraph): ...@@ -244,7 +246,8 @@ class NodeFlow(DGLBaseGraph):
Tensor Tensor
The parent node id array. The parent node id array.
""" """
return self._node_mapping.tousertensor()[nid] nid = utils.toindex(nid)
return self._node_mapping.tousertensor()[nid.tousertensor()]
def map_to_parent_eid(self, eid): def map_to_parent_eid(self, eid):
"""This maps the child edge Ids to the parent Ids. """This maps the child edge Ids to the parent Ids.
...@@ -259,7 +262,8 @@ class NodeFlow(DGLBaseGraph): ...@@ -259,7 +262,8 @@ class NodeFlow(DGLBaseGraph):
Tensor Tensor
The parent edge id array. The parent edge id array.
""" """
return self._edge_mapping.tousertensor()[eid] eid = utils.toindex(eid)
return self._edge_mapping.tousertensor()[eid.tousertensor()]
def map_from_parent_nid(self, layer_id, parent_nids): def map_from_parent_nid(self, layer_id, parent_nids):
"""Map parent node Ids to NodeFlow node Ids in a certain layer. """Map parent node Ids to NodeFlow node Ids in a certain layer.
...@@ -398,13 +402,18 @@ class NodeFlow(DGLBaseGraph): ...@@ -398,13 +402,18 @@ class NodeFlow(DGLBaseGraph):
assert F.asnumpy(F.sum(ret == -1, 0)) == 0, "The eid in the parent graph is invalid." assert F.asnumpy(F.sum(ret == -1, 0)) == 0, "The eid in the parent graph is invalid."
return ret return ret
def block_edges(self, block_id): def block_edges(self, block_id, remap=False):
"""Return the edges in a block. """Return the edges in a block.
If remap is True, returned indices u, v, eid will be remapped to local
indices (i.e. starting from 0)
Parameters Parameters
---------- ----------
block_id : int block_id : int
The specified block to return the edges. The specified block to return the edges.
remap : boolean
Remap indices if True
Returns Returns
------- -------
...@@ -420,7 +429,8 @@ class NodeFlow(DGLBaseGraph): ...@@ -420,7 +429,8 @@ class NodeFlow(DGLBaseGraph):
rst = _CAPI_NodeFlowGetBlockAdj(self._graph._handle, "coo", rst = _CAPI_NodeFlowGetBlockAdj(self._graph._handle, "coo",
int(layer0_size), int(layer0_size),
int(self._layer_offsets[block_id + 1]), int(self._layer_offsets[block_id + 1]),
int(self._layer_offsets[block_id + 2])) int(self._layer_offsets[block_id + 2]),
remap)
idx = utils.toindex(rst(0)).tousertensor() idx = utils.toindex(rst(0)).tousertensor()
eid = utils.toindex(rst(1)) eid = utils.toindex(rst(1))
num_edges = int(len(idx) / 2) num_edges = int(len(idx) / 2)
...@@ -455,7 +465,8 @@ class NodeFlow(DGLBaseGraph): ...@@ -455,7 +465,8 @@ class NodeFlow(DGLBaseGraph):
rst = _CAPI_NodeFlowGetBlockAdj(self._graph._handle, fmt, rst = _CAPI_NodeFlowGetBlockAdj(self._graph._handle, fmt,
int(layer0_size), int(layer0_size),
int(self._layer_offsets[block_id + 1]), int(self._layer_offsets[block_id + 1]),
int(self._layer_offsets[block_id + 2])) int(self._layer_offsets[block_id + 2]),
True)
num_rows = self.layer_size(block_id + 1) num_rows = self.layer_size(block_id + 1)
num_cols = self.layer_size(block_id) num_cols = self.layer_size(block_id)
...@@ -515,7 +526,7 @@ class NodeFlow(DGLBaseGraph): ...@@ -515,7 +526,7 @@ class NodeFlow(DGLBaseGraph):
if shuffle is not required. if shuffle is not required.
""" """
block_id = self._get_block_id(block_id) block_id = self._get_block_id(block_id)
src, dst, eid = self.block_edges(block_id) src, dst, eid = self.block_edges(block_id, remap=True)
src = F.copy_to(src, ctx) # the index of the ctx will be cached src = F.copy_to(src, ctx) # the index of the ctx will be cached
dst = F.copy_to(dst, ctx) # the index of the ctx will be cached dst = F.copy_to(dst, ctx) # the index of the ctx will be cached
eid = F.copy_to(eid, ctx) # the index of the ctx will be cached eid = F.copy_to(eid, ctx) # the index of the ctx will be cached
...@@ -740,7 +751,7 @@ class NodeFlow(DGLBaseGraph): ...@@ -740,7 +751,7 @@ class NodeFlow(DGLBaseGraph):
assert func is not None assert func is not None
if is_all(edges): if is_all(edges):
u, v, _ = self.block_edges(block_id) u, v, _ = self.block_edges(block_id, remap=True)
u = utils.toindex(u) u = utils.toindex(u)
v = utils.toindex(v) v = utils.toindex(v)
eid = utils.toindex(slice(0, self.block_size(block_id))) eid = utils.toindex(slice(0, self.block_size(block_id)))
...@@ -827,8 +838,8 @@ class NodeFlow(DGLBaseGraph): ...@@ -827,8 +838,8 @@ class NodeFlow(DGLBaseGraph):
assert len(u) > 0, "block_compute must run on edges" assert len(u) > 0, "block_compute must run on edges"
u = utils.toindex(self._glb2lcl_nid(u.tousertensor(), block_id)) u = utils.toindex(self._glb2lcl_nid(u.tousertensor(), block_id))
v = utils.toindex(self._glb2lcl_nid(v.tousertensor(), block_id + 1)) v = utils.toindex(self._glb2lcl_nid(v.tousertensor(), block_id + 1))
dest_nodes = utils.toindex(self._glb2lcl_nid(dest_nodes.tousertensor(), dest_nodes = utils.toindex(
block_id + 1)) self._glb2lcl_nid(dest_nodes.tousertensor(), block_id + 1))
eid = utils.toindex(self._glb2lcl_eid(eid.tousertensor(), block_id)) eid = utils.toindex(self._glb2lcl_eid(eid.tousertensor(), block_id))
with ir.prog() as prog: with ir.prog() as prog:
...@@ -909,15 +920,22 @@ def _copy_to_like(arr1, arr2): ...@@ -909,15 +920,22 @@ def _copy_to_like(arr1, arr2):
return F.copy_to(arr1, F.context(arr2)) return F.copy_to(arr1, F.context(arr2))
def _get_frame(frame, names, ids, ctx): def _get_frame(frame, names, ids, ctx):
col_dict = {name: F.copy_to(frame[name][_copy_to_like(ids, frame[name])], \ col_dict = {}
ctx) for name in names} for name in names:
col = frame[name][_copy_to_like(ids, frame[name])]
if ctx:
col = F.copy_to(col, ctx)
col_dict[name] = col
if len(col_dict) == 0: if len(col_dict) == 0:
return FrameRef(Frame(num_rows=len(ids))) return FrameRef(Frame(num_rows=len(ids)))
else: else:
return FrameRef(Frame(col_dict)) return FrameRef(Frame(col_dict))
def _copy_frame(frame, ctx): def _copy_frame(frame, ctx):
return {name: F.copy_to(frame[name], ctx) for name in frame} new_frame = {}
for name in frame:
new_frame[name] = F.copy_to(frame[name], ctx) if ctx else frame[name]
return new_frame
def _update_frame(frame, names, ids, new_frame): def _update_frame(frame, names, ids, new_frame):
......
...@@ -3,8 +3,6 @@ ...@@ -3,8 +3,6 @@
from __future__ import absolute_import from __future__ import absolute_import
from abc import abstractmethod from abc import abstractmethod
import functools
import operator
from ... import backend as F from ... import backend as F
from ...frame import FrameRef, Frame from ...frame import FrameRef, Frame
...@@ -19,8 +17,6 @@ __all__ = [ ...@@ -19,8 +17,6 @@ __all__ = [
'OpCode', 'Executor', 'OpCode', 'Executor',
'NodeUDFExecutor', 'NODE_UDF', 'NodeUDFExecutor', 'NODE_UDF',
'EdgeUDFExecutor', 'EDGE_UDF', 'EdgeUDFExecutor', 'EDGE_UDF',
'SPMVExecutor', 'SPMV',
'SPMVWithDataExecutor', 'SPMV_WITH_DATA',
'ReadExecutor', 'READ', 'ReadExecutor', 'READ',
'ReadColExecutor', 'READ_COL', 'ReadColExecutor', 'READ_COL',
'ReadRowExecutor', 'READ_ROW', 'ReadRowExecutor', 'READ_ROW',
...@@ -34,15 +30,16 @@ __all__ = [ ...@@ -34,15 +30,16 @@ __all__ = [
'AppendRow_Executor', 'APPEND_ROW_', 'AppendRow_Executor', 'APPEND_ROW_',
'WriteRowInplace_Executor', 'WRITE_ROW_INPLACE_', 'WriteRowInplace_Executor', 'WRITE_ROW_INPLACE_',
'ClearFrame_Executor', 'CLEAR_FRAME_', 'ClearFrame_Executor', 'CLEAR_FRAME_',
'BinaryReduceExecutor', 'BINARY_REDUCE',
'CopyReduceExecutor', 'COPY_REDUCE',
] ]
class OpCode(object): class OpCode(object):
"""Opcode for all the executor types.""" """Opcode for all the executor types."""
# immutable op # immutable op
NODE_UDF = 0 NODE_UDF = 0
EDGE_UDF = 1 EDGE_UDF = 1
SPMV = 2
SPMV_WITH_DATA = 3
READ = 4 READ = 4
READ_COL = 5 READ_COL = 5
READ_ROW = 6 READ_ROW = 6
...@@ -58,6 +55,10 @@ class OpCode(object): ...@@ -58,6 +55,10 @@ class OpCode(object):
APPEND_ROW_ = 25 APPEND_ROW_ = 25
WRITE_ROW_INPLACE_ = 26 WRITE_ROW_INPLACE_ = 26
CLEAR_FRAME_ = 27 CLEAR_FRAME_ = 27
# DGL kernels
BINARY_REDUCE = 50
COPY_REDUCE = 51
class Executor(object): class Executor(object):
"""Base executor class. """Base executor class.
...@@ -422,181 +423,6 @@ def READ_ROW(fd, row, ret=None): ...@@ -422,181 +423,6 @@ def READ_ROW(fd, row, ret=None):
get_current_prog().issue(reg['executor_cls'](fd, row, ret)) get_current_prog().issue(reg['executor_cls'](fd, row, ret))
return ret return ret
class SPMVExecutor(Executor):
"""Executor for sparse-matrix-dense-matrix multiply.
Parameters
----------
spA : var.Var
Variable for sparse matrix lambda. The lambda returns the sparse matrix
given a context object.
B : var.Var
Variable for the dense feature tensor.
ret : var.Var
Variable for the result.
"""
def __init__(self, spA, B, ret):
self.spA = spA
self.B = B
self.ret = ret
def opcode(self):
return OpCode.SPMV
def arg_vars(self):
return [self.spA, self.B]
def ret_var(self):
return self.ret
def run(self):
spA_ctx_fn = self.spA.data
B = self.B.data
ctx = F.context(B)
spA = spA_ctx_fn(ctx)
if F.ndim(B) == 1:
# B is a vector, append a (1,) dim at the end
B = F.unsqueeze(B, 1)
C = F.spmm(spA, B)
C = F.squeeze(C, 1)
elif F.ndim(B) > 2:
# Flatten the dim 1:~
B_shape = F.shape(B)
feat_shape = B_shape[1:]
tmp_B_shape = (B_shape[0], functools.reduce(operator.mul, feat_shape, 1))
B = F.reshape(B, tmp_B_shape)
C = F.spmm(spA, B)
C_shape = (F.shape(C)[0],) + feat_shape
C = F.reshape(C, C_shape)
else:
C = F.spmm(spA, B)
self.ret.data = C
IR_REGISTRY[OpCode.SPMV] = {
'name' : 'SPMV',
'args_type' : [VarType.SPMAT, VarType.FEAT],
'ret_type' : VarType.FEAT,
'executor_cls' : SPMVExecutor,
}
def SPMV(spA, B, ret=None):
"""Perform sparse-matrix-dense-matrix multiply symbolically.
Parameters
----------
spA : var.Var
Variable for sparse matrix lambda. The lambda returns the sparse matrix
given a context object.
B : var.Var
Variable for the dense feature tensor.
ret : var.Var, optional
Variable for the result. If not give, a new variable will be created.
Returns
-------
var.Var
Variable for the result.
"""
reg = IR_REGISTRY[OpCode.SPMV]
ret = var.new(reg['ret_type']) if ret is None else ret
get_current_prog().issue(reg['executor_cls'](spA, B, ret))
return ret
class SPMVWithDataExecutor(Executor):
"""Executor for sparse-matrix-dense-matrix multiply with provided sparse data.
Parameters
----------
spA : var.Var
Variable for sparse matrix lambda. The lambda returns the sparse matrix
given a context object.
A_data : var.Var
Variable for the sparse matrix data.
B : var.Var
Variable for the dense feature tensor.
ret : var.Var
Variable for the result.
"""
def __init__(self, spA, A_data, B, ret):
self.spA = spA
self.A_data = A_data
self.B = B
self.ret = ret
def opcode(self):
return OpCode.SPMV_WITH_DATA
def arg_vars(self):
return [self.spA, self.A_data, self.B]
def ret_var(self):
return self.ret
def run(self):
spA_ctx_fn = self.spA.data
A_data = self.A_data.data
if F.ndim(A_data) > 1:
# A_data is of shape (E, 1). Squeeze the last dim.
A_data = F.squeeze(A_data, 1)
B = self.B.data
ctx = F.context(B)
spA = spA_ctx_fn(ctx)
spidx = F.sparse_matrix_indices(spA)
shape = F.shape(spA)
# shuffle index is not used
spA, _ = F.sparse_matrix(A_data, spidx, shape)
if F.ndim(B) == 1:
# B is a vector, append a (1,) dim at the end
B = F.unsqueeze(B, 1)
C = F.spmm(spA, B)
C = F.squeeze(C, 1)
elif F.ndim(B) > 2:
# Flatten the dim 1:~
B_shape = F.shape(B)
feat_shape = B_shape[1:]
tmp_B_shape = (B_shape[0], functools.reduce(operator.mul, feat_shape, 1))
B = F.reshape(B, tmp_B_shape)
C = F.spmm(spA, B)
C_shape = (F.shape(C)[0],) + feat_shape
C = F.reshape(C, C_shape)
else:
C = F.spmm(spA, B)
self.ret.data = C
IR_REGISTRY[OpCode.SPMV_WITH_DATA] = {
'name' : 'SPMV_WITH_DATA',
'args_type' : [VarType.SPMAT, VarType.FEAT, VarType.FEAT],
'ret_type' : VarType.FEAT,
'executor_cls' : SPMVWithDataExecutor,
}
def SPMV_WITH_DATA(spA, A_data, B, ret=None):
"""Perform sparse-matrix-dense-matrix multiply with sparse data symbolically.
Parameters
----------
spA : var.Var
Variable for sparse matrix lambda. The lambda returns the sparse matrix
given a context object.
A_data : var.Var
Variable for the sparse matrix data.
B : var.Var
Variable for the dense feature tensor.
ret : var.Var, optional
Variable for the result. If not give, a new variable will be created.
Returns
-------
var.Var
Variable for the result.
"""
reg = IR_REGISTRY[OpCode.SPMV_WITH_DATA]
ret = var.new(reg['ret_type']) if ret is None else ret
get_current_prog().issue(reg['executor_cls'](spA, A_data, B, ret))
return ret
class MergeRowExecutor(Executor): class MergeRowExecutor(Executor):
"""Executor for merge row data according to the given order. """Executor for merge row data according to the given order.
...@@ -1169,3 +995,254 @@ def CLEAR_FRAME_(fd): ...@@ -1169,3 +995,254 @@ def CLEAR_FRAME_(fd):
""" """
reg = IR_REGISTRY[OpCode.CLEAR_FRAME_] reg = IR_REGISTRY[OpCode.CLEAR_FRAME_]
get_current_prog().issue(reg['executor_cls'](fd)) get_current_prog().issue(reg['executor_cls'](fd))
class BinaryReduceExecutor(Executor):
"""Executor for BINARY_REDUCE
Parameters
----------
reducer : str
String representing reduction to perform, can be "sum", "max", "min",
"mean", "prod", "none" (no reduction)
binary_op : str
String representing binary operation to perform, can be "add", "mul",
"sub", "div", "dot"
graph : var.Var
Variable for graph index lambda. The lambda returns the immutable graph
index given a context object.
lhs: int
The lhs target (src, dst, edge)
rhs: int
The rhs target (src, dst, edge)
lhs_data : var.Var
Variable for the lhs data
rhs_data : var.Var
Variable for the rhs data
out_size : int
Output size
lhs_map : var.Var
Variable for mapping lambda. The lambda returns the lhs id mapping
array on given context
rhs_map : var.Var
Variable for mapping lambda. The lambda returns the rhs id mapping
array on given context
out_map : var.Var
Variable for mapping lambda. The lambda returns the output id mapping
array on given context
ret : var.Var
Variable for the result.
"""
def __init__(self, reducer, binary_op, graph, lhs, rhs, lhs_data,
rhs_data, out_size, lhs_map, rhs_map, out_map, ret):
self.reducer = reducer
self.binary_op = binary_op
self.graph = graph
self.lhs = lhs
self.rhs = rhs
self.lhs_data = lhs_data
self.rhs_data = rhs_data
self.out_size = out_size
self.lhs_map = lhs_map
self.rhs_map = rhs_map
self.out_map = out_map
self.ret = ret
def opcode(self):
return OpCode.BINARY_REDUCE
def arg_vars(self):
return [self.reducer, self.binary_op, self.graph, self.lhs, self.rhs,
self.lhs_data, self.rhs_data, self.out_size, self.lhs_map,
self.rhs_map, self.out_map]
def ret_var(self):
return self.ret
def run(self):
lhs_data = self.lhs_data.data
rhs_data = self.rhs_data.data
ctx = utils.to_dgl_context(F.context(lhs_data))
graph = self.graph.data(ctx)
lhs_map = self.lhs_map.data(ctx) if self.lhs_map.data else None
rhs_map = self.rhs_map.data(ctx) if self.rhs_map.data else None
out_map = self.out_map.data(ctx) if self.out_map.data else None
if not isinstance(lhs_map, tuple):
lhs_map = (lhs_map, lhs_map)
if not isinstance(rhs_map, tuple):
rhs_map = (rhs_map, rhs_map)
if not isinstance(out_map, tuple):
out_map = (out_map, out_map)
self.ret.data = F.binary_reduce(
self.reducer, self.binary_op, graph, self.lhs, self.rhs,
lhs_data, rhs_data, self.out_size, lhs_map, rhs_map, out_map)
IR_REGISTRY[OpCode.BINARY_REDUCE] = {
'name': 'BINARY_REDUCE',
'args_type': [VarType.STR, VarType.STR, VarType.GRAPH, VarType.INT,
VarType.INT, VarType.FEAT, VarType.FEAT, VarType.INT,
VarType.MAP, VarType.MAP, VarType.MAP],
'ret_type': VarType.FEAT,
'executor_cls': BinaryReduceExecutor,
}
def BINARY_REDUCE(reducer, binary_op, graph, lhs, rhs, lhs_data, rhs_data,
out_size, lhs_map, rhs_map, out_map, ret=None):
"""Perform BINARY_REDUCE symbolically.
Parameters
----------
reducer : str
String representing reduction to perform, can be "sum", "max", "min",
"mean", "prod", "none" (no reduction)
binary_op : str
String representing binary operation to perform, can be "add", "mul",
"sub", "div", "dot"
graph : var.Var
Variable for graph index lambda. The lambda returns the immutable graph
index given a context object.
lhs: int
The lhs target (src, dst, edge)
rhs: int
The rhs target (src, dst, edge)
lhs_data : var.Var
Variable for the lhs data
rhs_data : var.Var
Variable for the rhs data
out_size : int
Output size
lhs_map : var.Var
Variable for mapping lambda. The lambda returns the lhs id mapping
array on given context
rhs_map : var.Var
Variable for mapping lambda. The lambda returns the rhs id mapping
array on given context
out_map : var.Var
Variable for mapping lambda. The lambda returns the output id mapping
array on given context
ret : var.Var, optional
Variable for the result. If not give, a new variable will be created.
Returns
-------
var.Var
Variable for the result.
"""
reg = IR_REGISTRY[OpCode.BINARY_REDUCE]
ret = var.new(reg['ret_type']) if ret is None else ret
get_current_prog().issue(reg['executor_cls'](
reducer, binary_op, graph, lhs, rhs, lhs_data, rhs_data, out_size,
lhs_map, rhs_map, out_map, ret))
return ret
class CopyReduceExecutor(Executor):
"""Executor for COPY_REDUCE
Parameters
----------
reducer : str
String representing reduction to perform, can be "sum", "max", "min",
"mean", "prod", "none" (no reduction)
graph : var.Var
Variable for graph index lambda. The lambda returns the immutable graph
index given a context object.
target: int
The input target (src, dst, edge)
in_data : var.Var
Variable for the input data
out_size : int
Output size
in_map : var.Var
Variable for mapping lambda. The lambda returns the input id mapping
array on given context
out_map : var.Var
Variable for mapping lambda. The lambda returns the output id mapping
array on given context
ret : var.Var
Variable for the result.
"""
def __init__(self, reducer, graph, target, in_data, out_size, in_map,
out_map, ret):
self.reducer = reducer
self.graph = graph
self.target = target
self.in_data = in_data
self.out_size = out_size
self.in_map = in_map
self.out_map = out_map
self.ret = ret
def opcode(self):
return OpCode.COPY_REDUCE
def arg_vars(self):
return [self.reducer, self.graph, self.target, self.in_data,
self.out_size, self.in_map, self.out_map]
def ret_var(self):
return self.ret
def run(self):
in_data = self.in_data.data
ctx = utils.to_dgl_context(F.context(in_data))
graph = self.graph.data(ctx)
in_map = self.in_map.data(ctx) if self.in_map.data else None
out_map = self.out_map.data(ctx) if self.out_map.data else None
if not isinstance(in_map, tuple):
in_map = (in_map, in_map)
if not isinstance(out_map, tuple):
out_map = (out_map, out_map)
self.ret.data = F.copy_reduce(
self.reducer, graph, self.target, in_data, self.out_size, in_map,
out_map)
IR_REGISTRY[OpCode.COPY_REDUCE] = {
'name': 'COPY_REDUCE',
'args_type': [VarType.STR, VarType.GRAPH, VarType.INT, VarType.FEAT, VarType.INT,
VarType.MAP, VarType.MAP],
'ret_type': VarType.FEAT,
'executor_cls': CopyReduceExecutor,
}
def COPY_REDUCE(reducer, graph, target, in_data, out_size, in_map, out_map,
ret=None):
"""Perform COPY_REDUCE symbolically.
Parameters
----------
reducer : str
String representing reduction to perform, can be "sum", "max", "min",
"mean", "prod", "none" (no reduction)
graph : var.Var
Variable for graph index lambda. The lambda returns the immutable graph
index given a context object.
target: int
The input target (src, dst, edge)
in_data : var.Var
Variable for the input data
out_size : int
Output size
in_map : var.Var
Variable for mapping lambda. The lambda returns the input id mapping
array on given context
out_map : var.Var
Variable for mapping lambda. The lambda returns the output id mapping
array on given context
ret : var.Var, optional
Variable for the result. If not give, a new variable will be created.
Returns
-------
var.Var
Variable for the result.
"""
reg = IR_REGISTRY[OpCode.COPY_REDUCE]
ret = var.new(reg['ret_type']) if ret is None else ret
get_current_prog().issue(reg['executor_cls'](
reducer, graph, target, in_data, out_size, in_map, out_map, ret))
return ret
...@@ -11,26 +11,30 @@ class VarType(object): ...@@ -11,26 +11,30 @@ class VarType(object):
FEAT = 0 FEAT = 0
FEAT_DICT = 1 FEAT_DICT = 1
# Types for concrete objects (i.e, they must have values). # Types for concrete objects (i.e, they must have values).
SPMAT = 2 GRAPH = 2
IDX = 3 IDX = 3
STR = 4 STR = 4
FUNC = 5 FUNC = 5
MAP = 6
INT = 7
VAR_TYPE_NAME_MAP = [ VAR_TYPE_NAME_MAP = [
'Feat', 'Feat',
'FeatDict', 'FeatDict',
'SpMat', 'GRAPH',
'Idx', 'Idx',
'Str', 'Str',
'Func', 'Func',
'Map',
'Int',
] ]
class Var(object): class Var(object):
"""Class for variables in IR. """Class for variables in IR.
Variables represent data in the IR. A variable can contain concrete values. Variables represent data in the IR. A variable can contain concrete values.
Otherwise, it can act as a "symbol", whose values are not materialized at the Otherwise, it can act as a "symbol", whose values are not materialized at
moment, but later. the moment, but later.
Parameters Parameters
---------- ----------
...@@ -42,6 +46,7 @@ class Var(object): ...@@ -42,6 +46,7 @@ class Var(object):
The data. The data.
""" """
__slots__ = ['name', 'typecode', 'data'] __slots__ = ['name', 'typecode', 'data']
def __init__(self, name, typecode, data): def __init__(self, name, typecode, data):
self.name = name self.name = name
self.typecode = typecode self.typecode = typecode
...@@ -73,9 +78,9 @@ def FEAT_DICT(data=None, name=None): ...@@ -73,9 +78,9 @@ def FEAT_DICT(data=None, name=None):
"""Create a variable for feature dict.""" """Create a variable for feature dict."""
return new(VarType.FEAT_DICT, data, name) return new(VarType.FEAT_DICT, data, name)
def SPMAT(data=None, name=None): def GRAPH(data=None, name=None):
"""Create a variable for sparse matrix lambda.""" """Create a variable for graph index lambda."""
return new(VarType.SPMAT, data, name) return new(VarType.GRAPH, data, name)
def IDX(data=None, name=None): def IDX(data=None, name=None):
"""Create a variable for index.""" """Create a variable for index."""
...@@ -88,3 +93,11 @@ def STR(data=None, name=None): ...@@ -88,3 +93,11 @@ def STR(data=None, name=None):
def FUNC(data=None, name=None): def FUNC(data=None, name=None):
"""Create a variable for function.""" """Create a variable for function."""
return new(VarType.FUNC, data, name) return new(VarType.FUNC, data, name)
def MAP(data=None, name=None):
"""Create a variable for mapping lambda"""
return new(VarType.MAP, data, name)
def INT(data=None, name=None):
"""Create a variable for int value"""
return new(VarType.INT, data, name)
"""DGL mini-runtime.""" """DGL mini-runtime."""
class Runtime(object): class Runtime(object):
"""The mini runtime class.""" """The mini runtime class."""
@staticmethod @staticmethod
def run(prog): def run(prog):
"""Run the given program.""" """Run the given program."""
for exe in prog.execs: for exe in prog.execs:
#prog.pprint_exe(exe) # prog.pprint_exe(exe)
exe.run() exe.run()
This diff is collapsed.
This diff is collapsed.
...@@ -336,7 +336,7 @@ def build_relabel_map(x, is_sorted=False): ...@@ -336,7 +336,7 @@ def build_relabel_map(x, is_sorted=False):
>>> n2o >>> n2o
[1, 3, 5, 6] [1, 3, 5, 6]
>>> o2n >>> o2n
[n/a, 0, n/a, 2, n/a, 3, 4] [n/a, 0, n/a, 1, n/a, 2, 3]
"n/a" will be filled with 0 "n/a" will be filled with 0
...@@ -490,6 +490,27 @@ def get_ndata_name(g, name): ...@@ -490,6 +490,27 @@ def get_ndata_name(g, name):
name += '_' name += '_'
return name return name
def get_edata_name(g, name):
"""Return an edge data name that does not exist in the given graph.
The given name is directly returned if it does not exist in the given graph.
Parameters
----------
g : DGLGraph
The graph.
name : str
The proposed name.
Returns
-------
str
The node data name that does not exist.
"""
while name in g.edata:
name += '_'
return name
def unwrap_to_ptr_list(wrapper): def unwrap_to_ptr_list(wrapper):
"""Convert the internal vector wrapper to a python list of ctypes.c_void_p. """Convert the internal vector wrapper to a python list of ctypes.c_void_p.
...@@ -513,3 +534,19 @@ def unwrap_to_ptr_list(wrapper): ...@@ -513,3 +534,19 @@ def unwrap_to_ptr_list(wrapper):
rst = [ctypes.c_void_p(x) for x in data.contents] rst = [ctypes.c_void_p(x) for x in data.contents]
_api_internal._FreeVectorWrapper(wrapper) _api_internal._FreeVectorWrapper(wrapper)
return rst return rst
def to_dgl_context(ctx):
"""Convert a backend context to DGLContext"""
device_type = nd.DGLContext.STR2MASK[F.device_type(ctx)]
device_id = F.device_id(ctx)
return nd.DGLContext(device_type, device_id)
def to_nbits_int(tensor, nbits):
"""Change the dtype of integer tensor
The dtype of returned tensor uses nbits, nbits can only be 32 or 64
"""
assert(nbits in (32, 64)), "nbits can either be 32 or 64"
if nbits == 32:
return F.astype(tensor, F.int32)
else:
return F.astype(tensor, F.int64)
...@@ -25,6 +25,32 @@ IdArray Clone(IdArray arr) { ...@@ -25,6 +25,32 @@ IdArray Clone(IdArray arr) {
return ret; return ret;
} }
IdArray AsNumBits(IdArray arr, uint8_t bits) {
if (arr->dtype.bits == bits) {
return arr;
} else {
const int64_t len = arr->shape[0];
IdArray ret = IdArray::Empty({len},
DLDataType{kDLInt, bits, 1}, DLContext{kDLCPU, 0});
if (arr->dtype.bits == 32 && bits == 64) {
const int32_t* arr_data = static_cast<int32_t*>(arr->data);
int64_t* ret_data = static_cast<int64_t*>(ret->data);
for (int64_t i = 0; i < len; ++i) {
ret_data[i] = arr_data[i];
}
} else if (arr->dtype.bits == 64 && bits == 32) {
const int64_t* arr_data = static_cast<int64_t*>(arr->data);
int32_t* ret_data = static_cast<int32_t*>(ret->data);
for (int64_t i = 0; i < len; ++i) {
ret_data[i] = arr_data[i];
}
} else {
LOG(FATAL) << "Invalid type conversion.";
}
return ret;
}
}
IdArray Add(IdArray lhs, IdArray rhs) { IdArray Add(IdArray lhs, IdArray rhs) {
IdArray ret = NewIdArray(lhs->shape[0]); IdArray ret = NewIdArray(lhs->shape[0]);
const dgl_id_t* lhs_data = static_cast<dgl_id_t*>(lhs->data); const dgl_id_t* lhs_data = static_cast<dgl_id_t*>(lhs->data);
......
...@@ -12,6 +12,18 @@ ...@@ -12,6 +12,18 @@
#include <algorithm> #include <algorithm>
#include <vector> #include <vector>
namespace {
/*! \brief Check whether two device contexts are the same.*/
inline bool operator == (const DLContext& ctx1, const DLContext& ctx2) {
return ctx1.device_type == ctx2.device_type && ctx1.device_id == ctx2.device_id;
}
/*! \brief Output the string representation of device context.*/
inline std::ostream& operator << (std::ostream& os, const DLContext& ctx) {
return os << "" << ctx.device_type << ":" << ctx.device_id << "";
}
} // namespace
namespace dgl { namespace dgl {
// Graph handler type // Graph handler type
......
...@@ -137,15 +137,11 @@ DGL_REGISTER_GLOBAL("graph_index._CAPI_DGLGraphCreate") ...@@ -137,15 +137,11 @@ DGL_REGISTER_GLOBAL("graph_index._CAPI_DGLGraphCreate")
const bool readonly = args[4]; const bool readonly = args[4];
GraphHandle ghandle; GraphHandle ghandle;
if (readonly) { if (readonly) {
// TODO(minjie): The array copy here is unnecessary and adds extra overhead.
// However, with MXNet backend, the memory would be corrupted if we directly
// save the passed-in ndarrays into DGL's graph object. We hope MXNet team
// could help look into this.
if (multigraph == kBoolUnknown) { if (multigraph == kBoolUnknown) {
COOPtr coo(new COO(num_nodes, Clone(src_ids), Clone(dst_ids))); COOPtr coo(new COO(num_nodes, src_ids, dst_ids));
ghandle = new ImmutableGraph(coo); ghandle = new ImmutableGraph(coo);
} else { } else {
COOPtr coo(new COO(num_nodes, Clone(src_ids), Clone(dst_ids), multigraph)); COOPtr coo(new COO(num_nodes, src_ids, dst_ids, multigraph));
ghandle = new ImmutableGraph(coo); ghandle = new ImmutableGraph(coo);
} }
} else { } else {
...@@ -170,14 +166,10 @@ DGL_REGISTER_GLOBAL("graph_index._CAPI_DGLGraphCSRCreate") ...@@ -170,14 +166,10 @@ DGL_REGISTER_GLOBAL("graph_index._CAPI_DGLGraphCSRCreate")
for (size_t i = 0; i < edge_ids->shape[0]; i++) for (size_t i = 0; i < edge_ids->shape[0]; i++)
edge_data[i] = i; edge_data[i] = i;
if (shared_mem_name.empty()) { if (shared_mem_name.empty()) {
// TODO(minjie): The array copy here is unnecessary and adds extra overhead.
// However, with MXNet backend, the memory would be corrupted if we directly
// save the passed-in ndarrays into DGL's graph object. We hope MXNet team
// could help look into this.
if (multigraph == kBoolUnknown) { if (multigraph == kBoolUnknown) {
csr.reset(new CSR(Clone(indptr), Clone(indices), Clone(edge_ids))); csr.reset(new CSR(indptr, indices, edge_ids));
} else { } else {
csr.reset(new CSR(Clone(indptr), Clone(indices), Clone(edge_ids), multigraph)); csr.reset(new CSR(indptr, indices, edge_ids, multigraph));
} }
} else { } else {
if (multigraph == kBoolUnknown) { if (multigraph == kBoolUnknown) {
...@@ -200,7 +192,7 @@ DGL_REGISTER_GLOBAL("graph_index._CAPI_DGLGraphCSRCreateMMap") ...@@ -200,7 +192,7 @@ DGL_REGISTER_GLOBAL("graph_index._CAPI_DGLGraphCSRCreateMMap")
const std::string shared_mem_name = args[0]; const std::string shared_mem_name = args[0];
const int64_t num_vertices = args[1]; const int64_t num_vertices = args[1];
const int64_t num_edges = args[2]; const int64_t num_edges = args[2];
const bool multigraph = static_cast<bool>(args[3]); const bool multigraph = args[3];
const std::string edge_dir = args[4]; const std::string edge_dir = args[4];
// TODO(minjie): how to know multigraph // TODO(minjie): how to know multigraph
CSRPtr csr(new CSR(shared_mem_name, num_vertices, num_edges, multigraph)); CSRPtr csr(new CSR(shared_mem_name, num_vertices, num_edges, multigraph));
...@@ -523,6 +515,54 @@ DGL_REGISTER_GLOBAL("graph_index._CAPI_DGLGraphGetAdj") ...@@ -523,6 +515,54 @@ DGL_REGISTER_GLOBAL("graph_index._CAPI_DGLGraphGetAdj")
*rv = ConvertAdjToPackedFunc(res); *rv = ConvertAdjToPackedFunc(res);
}); });
DGL_REGISTER_GLOBAL("graph_index._CAPI_DGLToImmutable")
.set_body([] (DGLArgs args, DGLRetValue* rv) {
GraphHandle ghandle = args[0];
const GraphInterface *ptr = static_cast<GraphInterface *>(ghandle);
GraphHandle newhandle = new ImmutableGraph(ImmutableGraph::ToImmutable(ptr));
*rv = newhandle;
});
DGL_REGISTER_GLOBAL("graph_index._CAPI_DGLGraphContext")
.set_body([] (DGLArgs args, DGLRetValue* rv) {
GraphHandle ghandle = args[0];
const GraphInterface *ptr = static_cast<GraphInterface *>(ghandle);
*rv = ptr->Context();
});
DGL_REGISTER_GLOBAL("graph_index._CAPI_DGLImmutableGraphCopyTo")
.set_body([] (DGLArgs args, DGLRetValue* rv) {
GraphHandle ghandle = args[0];
const int device_type = args[1];
const int device_id = args[2];
DLContext ctx;
ctx.device_type = static_cast<DLDeviceType>(device_type);
ctx.device_id = device_id;
const GraphInterface *ptr = static_cast<GraphInterface *>(ghandle);
const ImmutableGraph *ig = dynamic_cast<const ImmutableGraph*>(ptr);
CHECK(ig) << "Invalid argument: must be an immutable graph object.";
GraphHandle newhandle = new ImmutableGraph(ig->CopyTo(ctx));
*rv = newhandle;
});
DGL_REGISTER_GLOBAL("graph_index._CAPI_DGLGraphNumBits")
.set_body([] (DGLArgs args, DGLRetValue* rv) {
GraphHandle ghandle = args[0];
const GraphInterface *ptr = static_cast<GraphInterface *>(ghandle);
*rv = ptr->NumBits();
});
DGL_REGISTER_GLOBAL("graph_index._CAPI_DGLImmutableGraphAsNumBits")
.set_body([] (DGLArgs args, DGLRetValue* rv) {
GraphHandle ghandle = args[0];
int bits = args[1];
const GraphInterface *ptr = static_cast<GraphInterface *>(ghandle);
const ImmutableGraph *ig = dynamic_cast<const ImmutableGraph*>(ptr);
CHECK(ig) << "Invalid argument: must be an immutable graph object.";
GraphHandle newhandle = new ImmutableGraph(ig->AsNumBits(bits));
*rv = newhandle;
});
DGL_REGISTER_GLOBAL("transform._CAPI_DGLToSimpleGraph") DGL_REGISTER_GLOBAL("transform._CAPI_DGLToSimpleGraph")
.set_body([] (DGLArgs args, DGLRetValue* rv) { .set_body([] (DGLArgs args, DGLRetValue* rv) {
GraphHandle ghandle = args[0]; GraphHandle ghandle = args[0];
......
This diff is collapsed.
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment