"tools/git@developer.sourcefind.cn:OpenDAS/dlib.git" did not exist on "c335bf6756d85825ae3c89b9d6fed027751f8606"
Unverified Commit c81efdf2 authored by Jinjing Zhou's avatar Jinjing Zhou Committed by GitHub
Browse files

Remove deprecated kernels (#3316)

* remove

* remove

* fix

* remove

* remove
parent 75d793a1
/*!
* Copyright (c) 2019 by Contributors
* \file kernel/cuda/utils.cu
* \brief Utility function implementations on CUDA
*/
#include "../../runtime/cuda/cuda_common.h"
#include "../utils.h"
namespace dgl {
namespace kernel {
namespace utils {
template <typename DType>
__global__ void _FillKernel(DType* ptr, size_t length, DType val) {
int tx = blockIdx.x * blockDim.x + threadIdx.x;
int stride_x = gridDim.x * blockDim.x;
while (tx < length) {
ptr[tx] = val;
tx += stride_x;
}
}
template <int XPU, typename DType>
void Fill(const DLContext& ctx, DType* ptr, size_t length, DType val) {
auto* thr_entry = runtime::CUDAThreadEntry::ThreadLocal();
int nt = utils::FindNumThreads(length, 1024);
int nb = (length + nt - 1) / nt;
CUDA_KERNEL_CALL(_FillKernel, nb, nt, 0, thr_entry->stream, ptr, length, val);
}
template void Fill<kDLGPU, float>(const DLContext& ctx, float* ptr, size_t length, float val);
template void Fill<kDLGPU, double>(const DLContext& ctx, double* ptr, size_t length, double val);
} // namespace utils
} // namespace kernel
} // namespace dgl
/*!
* Copyright (c) 2018 by Contributors
* \file kernel/utils.cc
* \brief Kernel utilities
*/
#include <dgl/array.h>
#include <vector>
#include <string>
#include "./utils.h"
#include "./binary_reduce_common.h"
namespace dgl {
namespace kernel {
namespace utils {
int FindNumThreads(int dim, int max_nthrs) {
int ret = max_nthrs;
while (ret > dim) {
ret = ret >> 1;
}
return ret;
}
int FindNumBlocks(int nblks, int max_nblks) {
if (nblks < max_nblks)
return nblks;
return max_nblks;
}
int64_t ComputeXLength(runtime::NDArray feat_array) {
int64_t ret = 1;
for (int i = 1; i < feat_array->ndim; ++i) {
ret *= feat_array->shape[i];
}
return ret;
}
int64_t NElements(const runtime::NDArray& array) {
if (aten::IsNullArray(array)) {
return 0;
} else {
int64_t ret = 1;
for (int i = 0; i < array->ndim; ++i) {
ret *= array->shape[i];
}
return ret;
}
}
int64_t Prod(const std::vector<int64_t>& vec) {
int64_t ret = 1;
for (int64_t v : vec) {
ret *= v;
}
return ret;
}
} // namespace utils
} // namespace kernel
} // namespace dgl
/*!
* Copyright (c) 2018 by Contributors
* \file kernel/utils.h
* \brief Kernel utilities
*/
#ifndef DGL_KERNEL_UTILS_H_
#define DGL_KERNEL_UTILS_H_
#include <minigun/csr.h>
#include <dlpack/dlpack.h>
#include <dgl/runtime/ndarray.h>
#include <dgl/array.h>
#include <cstdlib>
#include <vector>
namespace dgl {
namespace kernel {
namespace utils {
/*
* !\brief Find number of threads is smaller than dim and max_nthrs
* and is also the power of two.
*/
int FindNumThreads(int dim, int max_nthrs);
/*
* !\brief Find number of blocks is smaller than nblks and max_nblks
*/
int FindNumBlocks(int nblks, int max_nblks);
/*
* !\brief Compute the total number of feature elements.
*/
int64_t ComputeXLength(runtime::NDArray feat_array);
/*
* !\brief Compute the total number of elements in the array.
*/
int64_t NElements(const runtime::NDArray& array);
/*
* !\brief Compute the product of the given vector.
*/
int64_t Prod(const std::vector<int64_t>& vec);
/*
* !\brief Fill the array with constant value.
*/
template <int XPU, typename DType>
void Fill(const DLContext& ctx, DType* ptr, size_t length, DType val);
/*
* !\brief Create minigun CSR from two ndarrays.
*/
template <typename Idx>
minigun::Csr<Idx> CreateCsr(runtime::NDArray indptr, runtime::NDArray indices) {
minigun::Csr<Idx> csr;
csr.row_offsets.data = static_cast<Idx*>(indptr->data);
csr.row_offsets.length = indptr->shape[0];
csr.column_indices.data = static_cast<Idx*>(indices->data);
csr.column_indices.length = indices->shape[0];
return csr;
}
} // namespace utils
} // namespace kernel
} // namespace dgl
#endif // DGL_KERNEL_UTILS_H_
...@@ -7,10 +7,10 @@ ...@@ -7,10 +7,10 @@
#include <cassert> #include <cassert>
#include "cuda_hashtable.cuh" #include "cuda_hashtable.cuh"
#include "../../kernel/cuda/atomic.cuh" #include "../../array/cuda/atomic.cuh"
#include "../../array/cuda/dgl_cub.cuh" #include "../../array/cuda/dgl_cub.cuh"
using namespace dgl::kernel::cuda; using namespace dgl::aten::cuda;
namespace dgl { namespace dgl {
namespace runtime { namespace runtime {
......
...@@ -29,7 +29,6 @@ ...@@ -29,7 +29,6 @@
#include "cuda_common.h" #include "cuda_common.h"
#include "../../runtime/workspace.h" #include "../../runtime/workspace.h"
#include "../../partition/ndarray_partition.h" #include "../../partition/ndarray_partition.h"
#include "../../kernel/cuda/atomic.cuh"
#include "../../array/cuda/dgl_cub.cuh" #include "../../array/cuda/dgl_cub.cuh"
#include "../../array/cuda/array_index_select.cuh" #include "../../array/cuda/array_index_select.cuh"
...@@ -44,7 +43,6 @@ ...@@ -44,7 +43,6 @@
namespace dgl { namespace dgl {
using namespace kernel::cuda;
using namespace partition; using namespace partition;
namespace runtime { namespace runtime {
......
import backend as F
import numpy as np
from numpy.testing import assert_array_equal, assert_allclose
import scipy as sp
import operator
import dgl
from dgl.contrib.sampling.sampler import create_full_nodeflow, NeighborSampler
from dgl import utils
import dgl.function as fn
from functools import partial
import itertools
import unittest
def generate_rand_graph(n, connect_more=False, complete=False, add_self_loop=False):
if complete:
cord = [(i, j)
for i, j in itertools.product(range(n), range(n)) if i != j]
row = [t[0] for t in cord]
col = [t[1] for t in cord]
data = np.ones((len(row),))
arr = sp.sparse.coo_matrix((data, (row, col)), shape=(n, n))
else:
arr = (sp.sparse.random(n, n, density=0.1,
format='coo') != 0).astype(np.int64)
# having one node to connect to all other nodes.
if connect_more:
arr[0] = 1
arr[:, 0] = 1
if add_self_loop:
g = dgl.DGLGraphStale(arr, readonly=False)
nodes = np.arange(g.number_of_nodes())
g.add_edges(nodes, nodes)
g.readonly()
else:
g = dgl.DGLGraphStale(arr, readonly=True)
g.ndata['h1'] = F.randn((g.number_of_nodes(), 10))
g.edata['h2'] = F.randn((g.number_of_edges(), 3))
return g
def create_mini_batch(g, num_hops, add_self_loop=False):
seed_ids = np.array([1, 2, 0, 3])
sampler = NeighborSampler(g, batch_size=4, expand_factor=g.number_of_nodes(),
num_hops=num_hops, seed_nodes=seed_ids, add_self_loop=add_self_loop)
nfs = list(sampler)
assert len(nfs) == 1
assert_array_equal(F.asnumpy(nfs[0].layer_parent_nid(-1)), seed_ids)
return nfs[0]
def check_basic(g, nf):
num_nodes = 0
for i in range(nf.num_layers):
num_nodes += nf.layer_size(i)
assert nf.number_of_nodes() == num_nodes
num_edges = 0
for i in range(nf.num_blocks):
num_edges += nf.block_size(i)
assert nf.number_of_edges() == num_edges
assert len(nf) == num_nodes
assert nf.is_readonly
assert np.all(F.asnumpy(nf.has_nodes(list(range(num_nodes)))))
for i in range(num_nodes):
assert nf.has_node(i)
assert np.all(F.asnumpy(nf.has_nodes(
list(range(num_nodes, 2 * num_nodes)))) == 0)
for i in range(num_nodes, 2 * num_nodes):
assert not nf.has_node(i)
for block_id in range(nf.num_blocks):
u, v, eid = nf.block_edges(block_id)
assert np.all(F.asnumpy(nf.has_edges_between(u, v)))
deg = nf.layer_in_degree(0)
assert_array_equal(F.asnumpy(deg), np.zeros((nf.layer_size(0)), np.int64))
deg = nf.layer_out_degree(-1)
assert_array_equal(F.asnumpy(deg), np.zeros((nf.layer_size(-1)), np.int64))
nf.copy_from_parent()
for i in range(1, nf.num_layers):
in_deg = nf.layer_in_degree(i)
out_deg = nf.layer_out_degree(i - 1)
assert F.asnumpy(F.sum(in_deg, 0) == F.sum(out_deg, 0))
nids = nf.layer_nid(i)
parent_nids = nf.map_to_parent_nid(nids)
nids1 = nf.map_from_parent_nid(i, parent_nids)
assert_array_equal(F.asnumpy(nids), F.asnumpy(nids1))
data = nf.layers[i].data['h1']
data1 = g.nodes[nf.layer_parent_nid(i)].data['h1']
assert_array_equal(F.asnumpy(data), F.asnumpy(data1))
for i in range(nf.num_blocks):
data = nf.blocks[i].data['h2']
data1 = g.edges[nf.block_parent_eid(i)].data['h2']
assert_array_equal(F.asnumpy(data), F.asnumpy(data1))
# negative layer Ids.
for i in range(-1, -nf.num_layers, -1):
in_deg = nf.layer_in_degree(i)
out_deg = nf.layer_out_degree(i - 1)
assert F.asnumpy(F.sum(in_deg, 0) == F.sum(out_deg, 0))
def test_basic():
num_layers = 2
g = generate_rand_graph(100, connect_more=True)
nf = create_full_nodeflow(g, num_layers)
assert nf.number_of_nodes() == g.number_of_nodes() * (num_layers + 1)
assert nf.number_of_edges() == g.number_of_edges() * num_layers
assert nf.num_layers == num_layers + 1
assert nf.layer_size(0) == g.number_of_nodes()
assert nf.layer_size(1) == g.number_of_nodes()
check_basic(g, nf)
parent_nids = F.copy_to(F.arange(0, g.number_of_nodes()), F.cpu())
nids = nf.map_from_parent_nid(0, parent_nids, remap_local=True)
assert_array_equal(F.asnumpy(nids), F.asnumpy(parent_nids))
# should also work for negative layer ids
for l in range(-1, -num_layers, -1):
nids1 = nf.map_from_parent_nid(l, parent_nids, remap_local=True)
nids2 = nf.map_from_parent_nid(
l + num_layers, parent_nids, remap_local=True)
assert_array_equal(F.asnumpy(nids1), F.asnumpy(nids2))
g = generate_rand_graph(100)
nf = create_mini_batch(g, num_layers)
assert nf.num_layers == num_layers + 1
check_basic(g, nf)
g = generate_rand_graph(100, add_self_loop=True)
nf = create_mini_batch(g, num_layers, add_self_loop=True)
assert nf.num_layers == num_layers + 1
check_basic(g, nf)
def check_apply_nodes(create_node_flow, use_negative_block_id):
num_layers = 2
for i in range(num_layers):
l = -num_layers + i if use_negative_block_id else i
g = generate_rand_graph(100)
nf = create_node_flow(g, num_layers)
nf.copy_from_parent()
new_feats = F.randn((nf.layer_size(l), 5))
def update_func(nodes):
return {'h1': new_feats}
nf.apply_layer(l, update_func)
assert_array_equal(
F.asnumpy(nf.layers[l].data['h1']), F.asnumpy(new_feats))
new_feats = F.randn((4, 5))
def update_func1(nodes):
return {'h1': new_feats}
nf.apply_layer(l, update_func1, v=nf.layer_nid(l)[0:4])
assert_array_equal(
F.asnumpy(nf.layers[l].data['h1'][0:4]), F.asnumpy(new_feats))
def test_apply_nodes():
check_apply_nodes(create_full_nodeflow, use_negative_block_id=False)
check_apply_nodes(create_mini_batch, use_negative_block_id=False)
check_apply_nodes(create_full_nodeflow, use_negative_block_id=True)
check_apply_nodes(create_mini_batch, use_negative_block_id=True)
def check_apply_edges(create_node_flow):
num_layers = 2
for i in range(num_layers):
g = generate_rand_graph(100)
g.ndata["f"] = F.randn((100, 10))
nf = create_node_flow(g, num_layers)
nf.copy_from_parent()
new_feats = F.randn((nf.block_size(i), 5))
def update_func(edges):
return {'h2': new_feats, "f2": edges.src["f"] + edges.dst["f"]}
nf.apply_block(i, update_func)
assert_array_equal(
F.asnumpy(nf.blocks[i].data['h2']), F.asnumpy(new_feats))
# should also work for negative block ids
nf.apply_block(-num_layers + i, update_func)
assert_array_equal(
F.asnumpy(nf.blocks[i].data['h2']), F.asnumpy(new_feats))
eids = nf.block_parent_eid(i)
srcs, dsts = g.find_edges(eids)
expected_f_sum = g.nodes[srcs].data["f"] + g.nodes[dsts].data["f"]
assert_array_equal(
F.asnumpy(nf.blocks[i].data['f2']), F.asnumpy(expected_f_sum))
# test built-in
nf.apply_block(i, fn.u_add_v('f', 'f', 'f2'))
eids = nf.block_parent_eid(i)
srcs, dsts = g.find_edges(eids)
expected_f_sum = g.nodes[srcs].data["f"] + g.nodes[dsts].data["f"]
assert_array_equal(
F.asnumpy(nf.blocks[i].data['f2']), F.asnumpy(expected_f_sum))
def check_apply_edges1(create_node_flow):
num_layers = 2
for i in range(num_layers):
g = generate_rand_graph(100)
g.ndata["f"] = F.randn((100, 10))
nf = create_node_flow(g, num_layers)
nf.copy_from_parent()
new_feats = F.randn((nf.block_size(i), 5))
def update_func(edges):
return {'h2': new_feats, "f2": edges.src["f"] + edges.dst["f"]}
nf.register_apply_edge_func(update_func, i)
nf.apply_block(i)
assert_array_equal(
F.asnumpy(nf.blocks[i].data['h2']), F.asnumpy(new_feats))
# should also work for negative block ids
nf.register_apply_edge_func(update_func, -num_layers + i)
nf.apply_block(-num_layers + i)
assert_array_equal(
F.asnumpy(nf.blocks[i].data['h2']), F.asnumpy(new_feats))
eids = nf.block_parent_eid(i)
srcs, dsts = g.find_edges(eids)
expected_f_sum = g.nodes[srcs].data["f"] + g.nodes[dsts].data["f"]
#expected_f_sum = g.ndata["f"][srcs] + g.ndata["f"][dsts]
assert_array_equal(
F.asnumpy(nf.blocks[i].data['f2']), F.asnumpy(expected_f_sum))
def test_apply_edges():
check_apply_edges(create_full_nodeflow)
check_apply_edges(create_mini_batch)
check_apply_edges1(create_mini_batch)
def check_flow_compute(create_node_flow, use_negative_block_id=False):
num_layers = 2
g = generate_rand_graph(100)
nf = create_node_flow(g, num_layers)
nf.copy_from_parent()
g.ndata['h'] = g.ndata['h1']
nf.layers[0].data['h'] = nf.layers[0].data['h1']
# Test the computation on a layer at a time.
for i in range(num_layers):
l = -num_layers + i if use_negative_block_id else i
nf.block_compute(l, fn.copy_src(src='h', out='m'), fn.sum(msg='m', out='t'),
lambda nodes: {'h': nodes.data['t'] + 1})
g.update_all(fn.copy_src(src='h', out='m'), fn.sum(msg='m', out='t'),
lambda nodes: {'h': nodes.data['t'] + 1})
assert_allclose(F.asnumpy(nf.layers[i + 1].data['h']),
F.asnumpy(
g.nodes[nf.layer_parent_nid(i + 1)].data['h']),
rtol=1e-4, atol=1e-4)
# Test the computation when only a few nodes are active in a layer.
g.ndata['h'] = g.ndata['h1']
for i in range(num_layers):
l = -num_layers + i if use_negative_block_id else i
vs = nf.layer_nid(i+1)[0:4]
nf.block_compute(l, fn.copy_src(src='h', out='m'), fn.sum(msg='m', out='t'),
lambda nodes: {'h': nodes.data['t'] + 1}, v=vs)
g.update_all(fn.copy_src(src='h', out='m'), fn.sum(msg='m', out='t'),
lambda nodes: {'h': nodes.data['t'] + 1})
data1 = nf.layers[i + 1].data['h'][0:4]
data2 = g.nodes[nf.map_to_parent_nid(vs)].data['h']
assert_allclose(F.asnumpy(data1), F.asnumpy(
data2), rtol=1e-4, atol=1e-4)
def check_flow_compute1(create_node_flow, use_negative_block_id=False):
num_layers = 2
g = generate_rand_graph(100)
# test the case that we register UDFs per block.
nf = create_node_flow(g, num_layers)
nf.copy_from_parent()
g.ndata['h'] = g.ndata['h1']
nf.layers[0].data['h'] = nf.layers[0].data['h1']
for i in range(num_layers):
l = -num_layers + i if use_negative_block_id else i
nf.register_message_func(fn.copy_src(src='h', out='m'), l)
nf.register_reduce_func(fn.sum(msg='m', out='t'), l)
nf.register_apply_node_func(
lambda nodes: {'h': nodes.data['t'] + 1}, l)
nf.block_compute(l)
g.update_all(fn.copy_src(src='h', out='m'), fn.sum(msg='m', out='t'),
lambda nodes: {'h': nodes.data['t'] + 1})
assert_allclose(F.asnumpy(nf.layers[i + 1].data['h']),
F.asnumpy(
g.nodes[nf.layer_parent_nid(i + 1)].data['h']),
rtol=1e-4, atol=1e-4)
# test the case that we register UDFs in all blocks.
nf = create_node_flow(g, num_layers)
nf.copy_from_parent()
g.ndata['h'] = g.ndata['h1']
nf.layers[0].data['h'] = nf.layers[0].data['h1']
nf.register_message_func(fn.copy_src(src='h', out='m'))
nf.register_reduce_func(fn.sum(msg='m', out='t'))
nf.register_apply_node_func(lambda nodes: {'h': nodes.data['t'] + 1})
for i in range(num_layers):
l = -num_layers + i if use_negative_block_id else i
nf.block_compute(l)
g.update_all(fn.copy_src(src='h', out='m'), fn.sum(msg='m', out='t'),
lambda nodes: {'h': nodes.data['t'] + 1})
assert_allclose(F.asnumpy(nf.layers[i + 1].data['h']),
F.asnumpy(
g.nodes[nf.layer_parent_nid(i + 1)].data['h']),
rtol=1e-4, atol=1e-4)
class SrcMulEdgeMessageFunction(object):
def __init__(self, src_field, edge_field, out_field):
self.mul_op = operator.mul
self.src_field = src_field
self.edge_field = edge_field
self.out_field = out_field
def __call__(self, edges):
sdata = edges.src[self.src_field]
edata = edges.data[self.edge_field]
# Due to the different broadcasting semantics of different backends,
# we need to broadcast the sdata and edata to be of the same rank.
rank = max(F.ndim(sdata), F.ndim(edata))
sshape = F.shape(sdata)
eshape = F.shape(edata)
sdata = F.reshape(sdata, sshape + (1,) * (rank - F.ndim(sdata)))
edata = F.reshape(edata, eshape + (1,) * (rank - F.ndim(edata)))
ret = self.mul_op(sdata, edata)
return {self.out_field: ret}
def check_flow_compute2(create_node_flow):
num_layers = 2
g = generate_rand_graph(100)
g.edata['h'] = F.ones((g.number_of_edges(), 10))
nf = create_node_flow(g, num_layers)
nf.copy_from_parent()
g.ndata['h'] = g.ndata['h1']
nf.layers[0].data['h'] = nf.layers[0].data['h1']
for i in range(num_layers):
nf.block_compute(i, SrcMulEdgeMessageFunction(
'h', 'h', 't'), fn.sum('t', 'h1'))
nf.block_compute(i, fn.src_mul_edge('h', 'h', 'h'), fn.sum('h', 'h'))
g.update_all(fn.src_mul_edge('h', 'h', 'h'), fn.sum('h', 'h'))
assert_allclose(F.asnumpy(nf.layers[i + 1].data['h1']),
F.asnumpy(nf.layers[i + 1].data['h']),
rtol=1e-4, atol=1e-4)
assert_allclose(F.asnumpy(nf.layers[i + 1].data['h']),
F.asnumpy(
g.nodes[nf.layer_parent_nid(i + 1)].data['h']),
rtol=1e-4, atol=1e-4)
nf = create_node_flow(g, num_layers)
g.ndata['h'] = g.ndata['h1']
nf.copy_from_parent()
for i in range(nf.num_layers):
nf.layers[i].data['h'] = nf.layers[i].data['h1']
for i in range(num_layers):
nf.block_compute(i, fn.u_mul_v('h', 'h', 't'), fn.sum('t', 's'))
g.update_all(fn.u_mul_v('h', 'h', 't'), fn.sum('t', 's'))
assert_allclose(F.asnumpy(nf.layers[i + 1].data['s']),
F.asnumpy(
g.nodes[nf.layer_parent_nid(i + 1)].data['s']),
rtol=1e-4, atol=1e-4)
def test_flow_compute():
check_flow_compute(create_full_nodeflow)
check_flow_compute(create_mini_batch)
check_flow_compute(create_full_nodeflow, use_negative_block_id=True)
check_flow_compute(create_mini_batch, use_negative_block_id=True)
check_flow_compute1(create_mini_batch)
check_flow_compute1(create_mini_batch, use_negative_block_id=True)
check_flow_compute2(create_mini_batch)
def check_prop_flows(create_node_flow):
num_layers = 2
g = generate_rand_graph(100)
g.ndata['h'] = g.ndata['h1']
nf2 = create_node_flow(g, num_layers)
nf2.copy_from_parent()
# Test the computation on a layer at a time.
for i in range(num_layers):
g.update_all(fn.copy_src(src='h', out='m'), fn.sum(msg='m', out='t'),
lambda nodes: {'h': nodes.data['t'] + 1})
# Test the computation on all layers.
nf2.prop_flow(fn.copy_src(src='h', out='m'), fn.sum(msg='m', out='t'),
lambda nodes: {'h': nodes.data['t'] + 1})
assert_allclose(F.asnumpy(nf2.layers[-1].data['h']),
F.asnumpy(g.nodes[nf2.layer_parent_nid(-1)].data['h']),
rtol=1e-4, atol=1e-4)
def test_prop_flows():
check_prop_flows(create_full_nodeflow)
check_prop_flows(create_mini_batch)
@unittest.skipIf(dgl.backend.backend_name == "tensorflow",
reason="TF doesn't support inplace update, nf.copy_to_parent will trigger this")
def test_copy():
num_layers = 2
g = generate_rand_graph(100)
g.ndata['h'] = g.ndata['h1']
nf = create_mini_batch(g, num_layers)
nf.copy_from_parent()
for i in range(nf.num_layers):
assert len(g.ndata.keys()) == len(nf.layers[i].data.keys())
for key in g.ndata.keys():
assert key in nf.layers[i].data.keys()
assert_array_equal(F.asnumpy(nf.layers[i].data[key]),
F.asnumpy(g.nodes[nf.layer_parent_nid(i)].data[key]))
for i in range(nf.num_blocks):
assert len(g.edata.keys()) == len(nf.blocks[i].data.keys())
for key in g.edata.keys():
assert key in nf.blocks[i].data.keys()
assert_array_equal(F.asnumpy(nf.blocks[i].data[key]),
F.asnumpy(g.edges[nf.block_parent_eid(i)].data[key]))
nf = create_mini_batch(g, num_layers)
node_embed_names = [['h'], ['h1'], ['h']]
edge_embed_names = [['h2'], ['h2']]
nf.copy_from_parent(node_embed_names=node_embed_names,
edge_embed_names=edge_embed_names)
for i in range(nf.num_layers):
assert len(node_embed_names[i]) == len(nf.layers[i].data.keys())
for key in node_embed_names[i]:
assert key in nf.layers[i].data.keys()
assert_array_equal(F.asnumpy(nf.layers[i].data[key]),
F.asnumpy(g.nodes[nf.layer_parent_nid(i)].data[key]))
for i in range(nf.num_blocks):
assert len(edge_embed_names[i]) == len(nf.blocks[i].data.keys())
for key in edge_embed_names[i]:
assert key in nf.blocks[i].data.keys()
assert_array_equal(F.asnumpy(nf.blocks[i].data[key]),
F.asnumpy(g.edges[nf.block_parent_eid(i)].data[key]))
nf = create_mini_batch(g, num_layers)
g.ndata['h0'] = F.clone(g.ndata['h'])
node_embed_names = [['h0'], [], []]
nf.copy_from_parent(node_embed_names=node_embed_names,
edge_embed_names=None)
for i in range(num_layers):
nf.block_compute(i, fn.copy_src(src='h%d' % i, out='m'), fn.sum(msg='m', out='t'),
lambda nodes: {'h%d' % (i+1): nodes.data['t'] + 1})
g.update_all(fn.copy_src(src='h', out='m'), fn.sum(msg='m', out='t'),
lambda nodes: {'h': nodes.data['t'] + 1})
assert_allclose(F.asnumpy(nf.layers[i + 1].data['h%d' % (i+1)]),
F.asnumpy(
g.nodes[nf.layer_parent_nid(i + 1)].data['h']),
rtol=1e-4, atol=1e-4)
nf.copy_to_parent(node_embed_names=[['h0'], ['h1'], ['h2']])
for i in range(num_layers + 1):
assert_array_equal(F.asnumpy(nf.layers[i].data['h%d' % i]),
F.asnumpy(g.nodes[nf.layer_parent_nid(i)].data['h%d' % i]))
nf = create_mini_batch(g, num_layers)
g.ndata['h0'] = F.clone(g.ndata['h'])
g.ndata['h1'] = F.clone(g.ndata['h'])
g.ndata['h2'] = F.clone(g.ndata['h'])
node_embed_names = [['h0'], ['h1'], ['h2']]
nf.copy_from_parent(node_embed_names=node_embed_names,
edge_embed_names=None)
def msg_func(edge, ind):
assert 'h%d' % ind in edge.src.keys()
return {'m': edge.src['h%d' % ind]}
def reduce_func(node, ind):
assert 'h%d' % (ind + 1) in node.data.keys()
return {'h': F.sum(node.mailbox['m'], 1) + node.data['h%d' % (ind + 1)]}
for i in range(num_layers):
nf.block_compute(i, partial(msg_func, ind=i),
partial(reduce_func, ind=i))
def test_block_edges():
num_layers = 3
g = generate_rand_graph(100)
nf = create_mini_batch(g, num_layers)
assert nf.num_layers == num_layers + 1
for i in range(nf.num_blocks):
dest_nodes = utils.toindex(nf.layer_nid(i + 1))
src1, dst1, eid1 = nf.in_edges(dest_nodes, 'all')
src, dst, eid = nf.block_edges(i)
assert_array_equal(F.asnumpy(src), F.asnumpy(src1))
assert_array_equal(F.asnumpy(dst), F.asnumpy(dst1))
assert_array_equal(F.asnumpy(eid), F.asnumpy(eid1))
src, dst, eid = nf.block_edges(i, remap_local=True)
# should also work for negative block ids
src_by_neg, dst_by_neg, eid_by_neg = nf.block_edges(-nf.num_blocks + i,
remap_local=True)
assert_array_equal(F.asnumpy(src), F.asnumpy(src_by_neg))
assert_array_equal(F.asnumpy(dst), F.asnumpy(dst_by_neg))
assert_array_equal(F.asnumpy(eid), F.asnumpy(eid_by_neg))
src1 = nf._glb2lcl_nid(src1, i)
dst1 = nf._glb2lcl_nid(dst1, i + 1)
assert_array_equal(F.asnumpy(src), F.asnumpy(src1))
assert_array_equal(F.asnumpy(dst), F.asnumpy(dst1))
def test_block_adj_matrix():
num_layers = 3
g = generate_rand_graph(100)
nf = create_mini_batch(g, num_layers)
assert nf.num_layers == num_layers + 1
for i in range(nf.num_blocks):
u, v, _ = nf.block_edges(i, remap_local=True)
adj, _ = nf.block_adjacency_matrix(i, F.cpu())
adj = F.sparse_to_numpy(adj)
# should also work for negative block ids
adj_by_neg, _ = nf.block_adjacency_matrix(-nf.num_blocks + i, F.cpu())
adj_by_neg = F.sparse_to_numpy(adj_by_neg)
data = np.ones((len(u)), dtype=np.float32)
v = utils.toindex(v)
u = utils.toindex(u)
coo = sp.sparse.coo_matrix((data, (v.tonumpy(), u.tonumpy())),
shape=adj.shape).todense()
assert_array_equal(adj, coo)
assert_array_equal(adj_by_neg, coo)
def test_block_incidence_matrix():
num_layers = 3
g = generate_rand_graph(100)
nf = create_mini_batch(g, num_layers)
assert nf.num_layers == num_layers + 1
for i in range(nf.num_blocks):
typestrs = ["in", "out"] # todo need fix for "both"
adjs = []
for typestr in typestrs:
adj, _ = nf.block_incidence_matrix(i, typestr, F.cpu())
adj = F.sparse_to_numpy(adj)
adjs.append(adj)
# should work for negative block ids
adjs_by_neg = []
for typestr in typestrs:
adj_by_neg, _ = nf.block_incidence_matrix(
-nf.num_blocks + i, typestr, F.cpu())
adj_by_neg = F.sparse_to_numpy(adj_by_neg)
adjs_by_neg.append(adj_by_neg)
u, v, e = nf.block_edges(i, remap_local=True)
u = utils.toindex(u)
v = utils.toindex(v)
e = utils.toindex(e)
expected = []
data_in_and_out = np.ones((len(u)), dtype=np.float32)
expected.append(
sp.sparse.coo_matrix((data_in_and_out, (v.tonumpy(), e.tonumpy())),
shape=adjs[0].shape).todense()
)
expected.append(
sp.sparse.coo_matrix((data_in_and_out, (u.tonumpy(), e.tonumpy())),
shape=adjs[1].shape).todense()
)
for i in range(len(typestrs)):
assert_array_equal(adjs[i], expected[i])
assert_array_equal(adjs_by_neg[i], expected[i])
if __name__ == '__main__':
test_basic()
test_block_adj_matrix()
test_copy()
test_apply_nodes()
test_apply_edges()
test_flow_compute()
test_prop_flows()
test_self_loop()
test_block_edges()
test_block_incidence_matrix()
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment