Unverified Commit fc7775a2 authored by Da Zheng's avatar Da Zheng Committed by GitHub
Browse files

Define node/edge Ids in NodeFlow more clearly (#628)

* add tests.

* distinguish layer-local nid and nodeflow nid.

* use numpy assert_array_equal and assert_allclose

* fix map_from_parent_nid

* fix test

* fix test.

* renmae remap.

* update doc.

* update doc.

* update doc.

* fix test.

* fix test.
parent dec8b49b
...@@ -111,7 +111,8 @@ class GraphSAGETrain(gluon.Block): ...@@ -111,7 +111,8 @@ class GraphSAGETrain(gluon.Block):
for i, layer in enumerate(self.layers): for i, layer in enumerate(self.layers):
parent_nid = dgl.utils.toindex(nf.layer_parent_nid(i+1)) parent_nid = dgl.utils.toindex(nf.layer_parent_nid(i+1))
layer_nid = nf.map_from_parent_nid(i, parent_nid).as_in_context(h.context) layer_nid = nf.map_from_parent_nid(i, parent_nid,
remap_local=True).as_in_context(h.context)
self_h = h[layer_nid] self_h = h[layer_nid]
# activation from previous layer of myself, used in graphSAGE # activation from previous layer of myself, used in graphSAGE
nf.layers[i+1].data['self_h'] = self_h nf.layers[i+1].data['self_h'] = self_h
...@@ -165,7 +166,8 @@ class GraphSAGEInfer(gluon.Block): ...@@ -165,7 +166,8 @@ class GraphSAGEInfer(gluon.Block):
for i, layer in enumerate(self.layers): for i, layer in enumerate(self.layers):
nf.layers[i].data['h'] = h nf.layers[i].data['h'] = h
parent_nid = dgl.utils.toindex(nf.layer_parent_nid(i+1)) parent_nid = dgl.utils.toindex(nf.layer_parent_nid(i+1))
layer_nid = nf.map_from_parent_nid(i, parent_nid).as_in_context(h.context) layer_nid = nf.map_from_parent_nid(i, parent_nid,
remap_local=True).as_in_context(h.context)
# activation from previous layer of the nodes in (i+1)-th layer, used in graphSAGE # activation from previous layer of the nodes in (i+1)-th layer, used in graphSAGE
self_h = h[layer_nid] self_h = h[layer_nid]
nf.layers[i+1].data['self_h'] = self_h nf.layers[i+1].data['self_h'] = self_h
......
...@@ -85,8 +85,8 @@ class NodeFlow(DGLBaseGraph): ...@@ -85,8 +85,8 @@ class NodeFlow(DGLBaseGraph):
def _get_node_frame(self, layer_id): def _get_node_frame(self, layer_id):
return self._node_frames[layer_id] return self._node_frames[layer_id]
def _get_edge_frame(self, flow_id): def _get_edge_frame(self, block_id):
return self._edge_frames[flow_id] return self._edge_frames[block_id]
@property @property
def num_layers(self): def num_layers(self):
...@@ -116,7 +116,6 @@ class NodeFlow(DGLBaseGraph): ...@@ -116,7 +116,6 @@ class NodeFlow(DGLBaseGraph):
This is mainly for usage like: This is mainly for usage like:
* `g.layers[2].data['h']` to get the node features of layer#2. * `g.layers[2].data['h']` to get the node features of layer#2.
* `g.layers(2)` to get the nodes of layer#2.
""" """
return LayerView(self) return LayerView(self)
...@@ -125,8 +124,7 @@ class NodeFlow(DGLBaseGraph): ...@@ -125,8 +124,7 @@ class NodeFlow(DGLBaseGraph):
"""Return a BlockView of this NodeFlow. """Return a BlockView of this NodeFlow.
This is mainly for usage like: This is mainly for usage like:
* `g.blocks[1,2].data['h']` to get the edge features of blocks from layer#1 to layer#2. * `g.blocks[1].data['h']` to get the edge features of blocks from layer#1 to layer#2.
* `g.blocks(1, 2)` to get the edge ids of blocks #1->#2.
""" """
return BlockView(self) return BlockView(self)
...@@ -197,6 +195,16 @@ class NodeFlow(DGLBaseGraph): ...@@ -197,6 +195,16 @@ class NodeFlow(DGLBaseGraph):
def copy_to_parent(self, node_embed_names=ALL, edge_embed_names=ALL): def copy_to_parent(self, node_embed_names=ALL, edge_embed_names=ALL):
"""Copy node/edge embeddings to the parent graph. """Copy node/edge embeddings to the parent graph.
Note: if a node in the parent graph appears in multiple layers and they
in the NodeFlow has node data with the same name, the data of this node
in the lower layer will overwrite the node data in previous layer.
For example, node 5 in the parent graph appears in layer 0 and 1 and
they have the same node data 'h'. The node data in layer 1 of this node
will overwrite its data in layer 0 when copying the data back.
To avoid this, users can give node data in each layer a different name.
Parameters Parameters
---------- ----------
node_embed_names : a list of lists of strings, optional node_embed_names : a list of lists of strings, optional
...@@ -265,15 +273,20 @@ class NodeFlow(DGLBaseGraph): ...@@ -265,15 +273,20 @@ class NodeFlow(DGLBaseGraph):
eid = utils.toindex(eid) eid = utils.toindex(eid)
return self._edge_mapping.tousertensor()[eid.tousertensor()] return self._edge_mapping.tousertensor()[eid.tousertensor()]
def map_from_parent_nid(self, layer_id, parent_nids): def map_from_parent_nid(self, layer_id, parent_nids, remap_local=False):
"""Map parent node Ids to NodeFlow node Ids in a certain layer. """Map parent node Ids to NodeFlow node Ids in a certain layer.
If `remap_local` is True, it returns the node Ids local to the layer.
Otherwise, the node Ids are unique in the NodeFlow.
Parameters Parameters
---------- ----------
layer_id : int layer_id : int
The layer Id. The layer Id.
parent_nids: list or Tensor parent_nids: list or Tensor
Node Ids in the parent graph. Node Ids in the parent graph.
remap_local: boolean
Remap layer/block-level local Id if True; otherwise, NodeFlow-level Id.
Returns Returns
------- -------
...@@ -290,7 +303,10 @@ class NodeFlow(DGLBaseGraph): ...@@ -290,7 +303,10 @@ class NodeFlow(DGLBaseGraph):
mapping = mapping[start:end] mapping = mapping[start:end]
mapping = utils.toindex(mapping) mapping = utils.toindex(mapping)
nflow_ids = transform_ids(mapping, parent_nids) nflow_ids = transform_ids(mapping, parent_nids)
return nflow_ids.tousertensor() if remap_local:
return nflow_ids.tousertensor()
else:
return nflow_ids.tousertensor() + int(self._layer_offsets[layer_id])
def layer_in_degree(self, layer_id): def layer_in_degree(self, layer_id):
"""Return the in-degree of the nodes in the specified layer. """Return the in-degree of the nodes in the specified layer.
...@@ -327,6 +343,8 @@ class NodeFlow(DGLBaseGraph): ...@@ -327,6 +343,8 @@ class NodeFlow(DGLBaseGraph):
def layer_nid(self, layer_id): def layer_nid(self, layer_id):
"""Get the node Ids in the specified layer. """Get the node Ids in the specified layer.
The returned node Ids are unique in the NodeFlow.
Parameters Parameters
---------- ----------
layer_id : int layer_id : int
...@@ -335,7 +353,7 @@ class NodeFlow(DGLBaseGraph): ...@@ -335,7 +353,7 @@ class NodeFlow(DGLBaseGraph):
Returns Returns
------- -------
Tensor Tensor
The node id array. The node ids.
""" """
layer_id = self._get_layer_id(layer_id) layer_id = self._get_layer_id(layer_id)
assert layer_id + 1 < len(self._layer_offsets) assert layer_id + 1 < len(self._layer_offsets)
...@@ -367,6 +385,8 @@ class NodeFlow(DGLBaseGraph): ...@@ -367,6 +385,8 @@ class NodeFlow(DGLBaseGraph):
def block_eid(self, block_id): def block_eid(self, block_id):
"""Get the edge Ids in the specified block. """Get the edge Ids in the specified block.
The returned edge Ids are unique in the NodeFlow.
Parameters Parameters
---------- ----------
block_id : int block_id : int
...@@ -375,7 +395,7 @@ class NodeFlow(DGLBaseGraph): ...@@ -375,7 +395,7 @@ class NodeFlow(DGLBaseGraph):
Returns Returns
------- -------
Tensor Tensor
The edge id array. The edge ids of the block in the NodeFlow.
""" """
block_id = self._get_block_id(block_id) block_id = self._get_block_id(block_id)
start = self._block_offsets[block_id] start = self._block_offsets[block_id]
...@@ -393,7 +413,7 @@ class NodeFlow(DGLBaseGraph): ...@@ -393,7 +413,7 @@ class NodeFlow(DGLBaseGraph):
Returns Returns
------- -------
Tensor Tensor
The parent edge id array. The edge ids of the block in the parent graph.
""" """
block_id = self._get_block_id(block_id) block_id = self._get_block_id(block_id)
start = self._block_offsets[block_id] start = self._block_offsets[block_id]
...@@ -404,18 +424,19 @@ class NodeFlow(DGLBaseGraph): ...@@ -404,18 +424,19 @@ class NodeFlow(DGLBaseGraph):
assert F.asnumpy(F.sum(ret == -1, 0)) == 0, "The eid in the parent graph is invalid." assert F.asnumpy(F.sum(ret == -1, 0)) == 0, "The eid in the parent graph is invalid."
return ret return ret
def block_edges(self, block_id, remap=False): def block_edges(self, block_id, remap_local=False):
"""Return the edges in a block. """Return the edges in a block.
If remap is True, returned indices u, v, eid will be remapped to local If remap_local is True, returned indices u, v, eid will be remapped to local
indices (i.e. starting from 0) Ids (i.e. starting from 0) in the block or in the layer. Otherwise,
u, v, eid are unique in the NodeFlow.
Parameters Parameters
---------- ----------
block_id : int block_id : int
The specified block to return the edges. The specified block to return the edges.
remap : boolean remap_local : boolean
Remap indices if True Remap layer/block-level local Id if True; otherwise, NodeFlow-level Id.
Returns Returns
------- -------
...@@ -432,7 +453,7 @@ class NodeFlow(DGLBaseGraph): ...@@ -432,7 +453,7 @@ class NodeFlow(DGLBaseGraph):
int(layer0_size), int(layer0_size),
int(self._layer_offsets[block_id + 1]), int(self._layer_offsets[block_id + 1]),
int(self._layer_offsets[block_id + 2]), int(self._layer_offsets[block_id + 2]),
remap) remap_local)
idx = utils.toindex(rst(0)).tousertensor() idx = utils.toindex(rst(0)).tousertensor()
eid = utils.toindex(rst(1)) eid = utils.toindex(rst(1))
num_edges = int(len(idx) / 2) num_edges = int(len(idx) / 2)
...@@ -498,17 +519,14 @@ class NodeFlow(DGLBaseGraph): ...@@ -498,17 +519,14 @@ class NodeFlow(DGLBaseGraph):
value indicating whether the edge is incident to the node value indicating whether the edge is incident to the node
or not. or not.
There are three types of an incidence matrix `I`: There are two types of an incidence matrix `I`:
* "in": * "in":
- I[v, e] = 1 if e is the in-edge of v (or v is the dst node of e); - I[v, e] = 1 if e is the in-edge of v (or v is the dst node of e);
- I[v, e] = 0 otherwise. - I[v, e] = 0 otherwise.
* "out": * "out":
- I[v, e] = 1 if e is the out-edge of v (or v is the src node of e); - I[v, e] = 1 if e is the out-edge of v (or v is the src node of e);
- I[v, e] = 0 otherwise. - I[v, e] = 0 otherwise.
* "both": "both" isn't defined in the block of a NodeFlow.
- I[v, e] = 1 if e is the in-edge of v;
- I[v, e] = -1 if e is the out-edge of v;
- I[v, e] = 0 otherwise (including self-loop).
Parameters Parameters
---------- ----------
...@@ -528,7 +546,7 @@ class NodeFlow(DGLBaseGraph): ...@@ -528,7 +546,7 @@ class NodeFlow(DGLBaseGraph):
if shuffle is not required. if shuffle is not required.
""" """
block_id = self._get_block_id(block_id) block_id = self._get_block_id(block_id)
src, dst, eid = self.block_edges(block_id, remap=True) src, dst, eid = self.block_edges(block_id, remap_local=True)
src = F.copy_to(src, ctx) # the index of the ctx will be cached src = F.copy_to(src, ctx) # the index of the ctx will be cached
dst = F.copy_to(dst, ctx) # the index of the ctx will be cached dst = F.copy_to(dst, ctx) # the index of the ctx will be cached
eid = F.copy_to(eid, ctx) # the index of the ctx will be cached eid = F.copy_to(eid, ctx) # the index of the ctx will be cached
...@@ -550,23 +568,6 @@ class NodeFlow(DGLBaseGraph): ...@@ -550,23 +568,6 @@ class NodeFlow(DGLBaseGraph):
# FIXME(minjie): data type # FIXME(minjie): data type
dat = F.ones((m,), dtype=F.float32, ctx=ctx) dat = F.ones((m,), dtype=F.float32, ctx=ctx)
inc, shuffle_idx = F.sparse_matrix(dat, ('coo', idx), (n, m)) inc, shuffle_idx = F.sparse_matrix(dat, ('coo', idx), (n, m))
elif typestr == 'both':
# TODO does it work for bipartite graph?
# first remove entries for self loops
mask = F.logical_not(F.equal(src, dst))
src = F.boolean_mask(src, mask)
dst = F.boolean_mask(dst, mask)
eid = F.boolean_mask(eid, mask)
n_entries = F.shape(src)[0]
# create index
row = F.unsqueeze(F.cat([src, dst], dim=0), 0)
col = F.unsqueeze(F.cat([eid, eid], dim=0), 0)
idx = F.cat([row, col], dim=0)
# FIXME(minjie): data type
x = -F.ones((n_entries,), dtype=F.float32, ctx=ctx)
y = F.ones((n_entries,), dtype=F.float32, ctx=ctx)
dat = F.cat([x, y], dim=0)
inc, shuffle_idx = F.sparse_matrix(dat, ('coo', idx), (n, m))
else: else:
raise DGLError('Invalid incidence matrix type: %s' % str(typestr)) raise DGLError('Invalid incidence matrix type: %s' % str(typestr))
return inc, shuffle_idx return inc, shuffle_idx
...@@ -718,7 +719,7 @@ class NodeFlow(DGLBaseGraph): ...@@ -718,7 +719,7 @@ class NodeFlow(DGLBaseGraph):
Apply function on the nodes. The function should be Apply function on the nodes. The function should be
a :mod:`Node UDF <dgl.udf>`. a :mod:`Node UDF <dgl.udf>`.
v : a list of vertex Ids or ALL. v : a list of vertex Ids or ALL.
The vertices to run the node update function. The vertex Ids (unique in the NodeFlow) to run the node update function.
inplace : bool, optional inplace : bool, optional
If True, update will be done in place, but autograd will break. If True, update will be done in place, but autograd will break.
""" """
...@@ -750,7 +751,7 @@ class NodeFlow(DGLBaseGraph): ...@@ -750,7 +751,7 @@ class NodeFlow(DGLBaseGraph):
Apply function on the edges. The function should be Apply function on the edges. The function should be
an :mod:`Edge UDF <dgl.udf>`. an :mod:`Edge UDF <dgl.udf>`.
edges : a list of edge Ids or ALL. edges : a list of edge Ids or ALL.
The edges to run the edge update function. The edges Id to run the edge update function.
inplace : bool, optional inplace : bool, optional
If True, update will be done in place, but autograd will break. If True, update will be done in place, but autograd will break.
""" """
...@@ -760,7 +761,7 @@ class NodeFlow(DGLBaseGraph): ...@@ -760,7 +761,7 @@ class NodeFlow(DGLBaseGraph):
assert func is not None assert func is not None
if is_all(edges): if is_all(edges):
u, v, _ = self.block_edges(block_id, remap=True) u, v, _ = self.block_edges(block_id, remap_local=True)
u = utils.toindex(u) u = utils.toindex(u)
v = utils.toindex(v) v = utils.toindex(v)
eid = utils.toindex(slice(0, self.block_size(block_id))) eid = utils.toindex(slice(0, self.block_size(block_id)))
...@@ -818,7 +819,7 @@ class NodeFlow(DGLBaseGraph): ...@@ -818,7 +819,7 @@ class NodeFlow(DGLBaseGraph):
Apply function on the nodes. The function should be Apply function on the nodes. The function should be
a :mod:`Node UDF <dgl.udf>`. a :mod:`Node UDF <dgl.udf>`.
v : a list of vertex Ids or ALL. v : a list of vertex Ids or ALL.
The specified nodes in layer i+1 to run the computation. The Node Ids (unique in the NodeFlow) in layer block_id+1 to run the computation.
inplace: bool, optional inplace: bool, optional
If True, update will be done in place, but autograd will break. If True, update will be done in place, but autograd will break.
""" """
......
...@@ -536,7 +536,7 @@ def schedule_nodeflow_update_all(graph, ...@@ -536,7 +536,7 @@ def schedule_nodeflow_update_all(graph,
var_eid = var.IDX(eid) var_eid = var.IDX(eid)
# generate send + reduce # generate send + reduce
def uv_getter(): def uv_getter():
src, dst, _ = graph.block_edges(block_id, remap=True) src, dst, _ = graph.block_edges(block_id, remap_local=True)
return var.IDX(utils.toindex(src)), var.IDX(utils.toindex(dst)) return var.IDX(utils.toindex(src)), var.IDX(utils.toindex(dst))
adj_creator = lambda: spmv.build_gidx_and_mapping_block(graph, block_id) adj_creator = lambda: spmv.build_gidx_and_mapping_block(graph, block_id)
out_map_creator = lambda nbits: None out_map_creator = lambda nbits: None
......
...@@ -206,7 +206,7 @@ def build_gidx_and_mapping_block(graph, block_id, edge_tuples=None): ...@@ -206,7 +206,7 @@ def build_gidx_and_mapping_block(graph, block_id, edge_tuples=None):
Number of ints needed to represent the graph Number of ints needed to represent the graph
""" """
if edge_tuples is None: if edge_tuples is None:
u, v, eid = graph.block_edges(block_id, remap=True) u, v, eid = graph.block_edges(block_id, remap_local=True)
u = utils.toindex(u) u = utils.toindex(u)
v = utils.toindex(v) v = utils.toindex(v)
eid = utils.toindex(eid) eid = utils.toindex(eid)
......
import backend as F import backend as F
import numpy as np import numpy as np
from numpy.testing import assert_array_equal, assert_allclose
import scipy as sp import scipy as sp
import operator
import dgl import dgl
from dgl.contrib.sampling.sampler import create_full_nodeflow, NeighborSampler from dgl.contrib.sampling.sampler import create_full_nodeflow, NeighborSampler
from dgl import utils from dgl import utils
...@@ -8,7 +10,6 @@ import dgl.function as fn ...@@ -8,7 +10,6 @@ import dgl.function as fn
from functools import partial from functools import partial
import itertools import itertools
def generate_rand_graph(n, connect_more=False, complete=False): def generate_rand_graph(n, connect_more=False, complete=False):
if complete: if complete:
cord = [(i,j) for i, j in itertools.product(range(n), range(n)) if i != j] cord = [(i,j) for i, j in itertools.product(range(n), range(n)) if i != j]
...@@ -36,7 +37,7 @@ def test_self_loop(): ...@@ -36,7 +37,7 @@ def test_self_loop():
for i in range(1, nf.num_layers): for i in range(1, nf.num_layers):
in_deg = nf.layer_in_degree(i) in_deg = nf.layer_in_degree(i)
deg = F.copy_to(F.ones(in_deg.shape, dtype=F.int64), F.cpu()) * n deg = F.copy_to(F.ones(in_deg.shape, dtype=F.int64), F.cpu()) * n
assert F.array_equal(in_deg, deg) assert_array_equal(F.asnumpy(in_deg), F.asnumpy(deg))
def create_mini_batch(g, num_hops, add_self_loop=False): def create_mini_batch(g, num_hops, add_self_loop=False):
seed_ids = np.array([1, 2, 0, 3]) seed_ids = np.array([1, 2, 0, 3])
...@@ -44,7 +45,7 @@ def create_mini_batch(g, num_hops, add_self_loop=False): ...@@ -44,7 +45,7 @@ def create_mini_batch(g, num_hops, add_self_loop=False):
num_hops=num_hops, seed_nodes=seed_ids, add_self_loop=add_self_loop) num_hops=num_hops, seed_nodes=seed_ids, add_self_loop=add_self_loop)
nfs = list(sampler) nfs = list(sampler)
assert len(nfs) == 1 assert len(nfs) == 1
assert np.array_equal(F.asnumpy(nfs[0].layer_parent_nid(-1)), seed_ids) assert_array_equal(F.asnumpy(nfs[0].layer_parent_nid(-1)), seed_ids)
return nfs[0] return nfs[0]
def check_basic(g, nf): def check_basic(g, nf):
...@@ -56,16 +57,47 @@ def check_basic(g, nf): ...@@ -56,16 +57,47 @@ def check_basic(g, nf):
for i in range(nf.num_blocks): for i in range(nf.num_blocks):
num_edges += nf.block_size(i) num_edges += nf.block_size(i)
assert nf.number_of_edges() == num_edges assert nf.number_of_edges() == num_edges
assert len(nf) == num_nodes
assert nf.is_readonly
assert not nf.is_multigraph
assert np.all(F.asnumpy(nf.has_nodes(list(range(num_nodes)))))
for i in range(num_nodes):
assert nf.has_node(i)
assert np.all(F.asnumpy(nf.has_nodes(list(range(num_nodes, 2 * num_nodes)))) == 0)
for i in range(num_nodes, 2 * num_nodes):
assert not nf.has_node(i)
for block_id in range(nf.num_blocks):
u, v, eid = nf.block_edges(block_id)
assert np.all(F.asnumpy(nf.has_edges_between(u, v)))
deg = nf.layer_in_degree(0) deg = nf.layer_in_degree(0)
assert F.array_equal(deg, F.copy_to(F.zeros((nf.layer_size(0)), F.int64), F.cpu())) assert_array_equal(F.asnumpy(deg), np.zeros((nf.layer_size(0)), np.int64))
deg = nf.layer_out_degree(-1) deg = nf.layer_out_degree(-1)
assert F.array_equal(deg, F.copy_to(F.zeros((nf.layer_size(-1)), F.int64), F.cpu())) assert_array_equal(F.asnumpy(deg), np.zeros((nf.layer_size(-1)), np.int64))
nf.copy_from_parent()
for i in range(1, nf.num_layers): for i in range(1, nf.num_layers):
in_deg = nf.layer_in_degree(i) in_deg = nf.layer_in_degree(i)
out_deg = nf.layer_out_degree(i - 1) out_deg = nf.layer_out_degree(i - 1)
assert F.asnumpy(F.sum(in_deg, 0) == F.sum(out_deg, 0)) assert F.asnumpy(F.sum(in_deg, 0) == F.sum(out_deg, 0))
nids = nf.layer_nid(i)
parent_nids = nf.map_to_parent_nid(nids)
nids1 = nf.map_from_parent_nid(i, parent_nids)
assert_array_equal(F.asnumpy(nids), F.asnumpy(nids1))
data = nf.layers[i].data['h1']
data1 = g.nodes[nf.layer_parent_nid(i)].data['h1']
assert_array_equal(F.asnumpy(data), F.asnumpy(data1))
for i in range(nf.num_blocks):
data = nf.blocks[i].data['h2']
data1 = g.edges[nf.block_parent_eid(i)].data['h2']
assert_array_equal(F.asnumpy(data), F.asnumpy(data1))
# negative layer Ids. # negative layer Ids.
for i in range(-1, -nf.num_layers, -1): for i in range(-1, -nf.num_layers, -1):
in_deg = nf.layer_in_degree(i) in_deg = nf.layer_in_degree(i)
...@@ -85,14 +117,14 @@ def test_basic(): ...@@ -85,14 +117,14 @@ def test_basic():
check_basic(g, nf) check_basic(g, nf)
parent_nids = F.copy_to(F.arange(0, g.number_of_nodes()), F.cpu()) parent_nids = F.copy_to(F.arange(0, g.number_of_nodes()), F.cpu())
nids = nf.map_from_parent_nid(0, parent_nids) nids = nf.map_from_parent_nid(0, parent_nids, remap_local=True)
assert F.array_equal(nids, parent_nids) assert_array_equal(F.asnumpy(nids), F.asnumpy(parent_nids))
# should also work for negative layer ids # should also work for negative layer ids
for l in range(-1, -num_layers, -1): for l in range(-1, -num_layers, -1):
nids1 = nf.map_from_parent_nid(l, parent_nids) nids1 = nf.map_from_parent_nid(l, parent_nids, remap_local=True)
nids2 = nf.map_from_parent_nid(l + num_layers, parent_nids) nids2 = nf.map_from_parent_nid(l + num_layers, parent_nids, remap_local=True)
assert F.array_equal(nids1, nids2) assert_array_equal(F.asnumpy(nids1), F.asnumpy(nids2))
g = generate_rand_graph(100) g = generate_rand_graph(100)
nf = create_mini_batch(g, num_layers) nf = create_mini_batch(g, num_layers)
...@@ -111,13 +143,13 @@ def check_apply_nodes(create_node_flow, use_negative_block_id): ...@@ -111,13 +143,13 @@ def check_apply_nodes(create_node_flow, use_negative_block_id):
def update_func(nodes): def update_func(nodes):
return {'h1' : new_feats} return {'h1' : new_feats}
nf.apply_layer(l, update_func) nf.apply_layer(l, update_func)
assert F.array_equal(nf.layers[l].data['h1'], new_feats) assert_array_equal(F.asnumpy(nf.layers[l].data['h1']), F.asnumpy(new_feats))
new_feats = F.randn((4, 5)) new_feats = F.randn((4, 5))
def update_func1(nodes): def update_func1(nodes):
return {'h1' : new_feats} return {'h1' : new_feats}
nf.apply_layer(l, update_func1, v=nf.layer_nid(l)[0:4]) nf.apply_layer(l, update_func1, v=nf.layer_nid(l)[0:4])
assert F.array_equal(nf.layers[l].data['h1'][0:4], new_feats) assert_array_equal(F.asnumpy(nf.layers[l].data['h1'][0:4]), F.asnumpy(new_feats))
def test_apply_nodes(): def test_apply_nodes():
...@@ -140,16 +172,16 @@ def check_apply_edges(create_node_flow): ...@@ -140,16 +172,16 @@ def check_apply_edges(create_node_flow):
return {'h2': new_feats, "f2": edges.src["f"] + edges.dst["f"]} return {'h2': new_feats, "f2": edges.src["f"] + edges.dst["f"]}
nf.apply_block(i, update_func) nf.apply_block(i, update_func)
assert F.array_equal(nf.blocks[i].data['h2'], new_feats) assert_array_equal(F.asnumpy(nf.blocks[i].data['h2']), F.asnumpy(new_feats))
# should also work for negative block ids # should also work for negative block ids
nf.apply_block(-num_layers + i, update_func) nf.apply_block(-num_layers + i, update_func)
assert F.array_equal(nf.blocks[i].data['h2'], new_feats) assert_array_equal(F.asnumpy(nf.blocks[i].data['h2']), F.asnumpy(new_feats))
eids = nf.block_parent_eid(i) eids = nf.block_parent_eid(i)
srcs, dsts = g.find_edges(eids) srcs, dsts = g.find_edges(eids)
expected_f_sum = g.nodes[srcs].data["f"] + g.nodes[dsts].data["f"] expected_f_sum = g.nodes[srcs].data["f"] + g.nodes[dsts].data["f"]
assert F.array_equal(nf.blocks[i].data['f2'], expected_f_sum) assert_array_equal(F.asnumpy(nf.blocks[i].data['f2']), F.asnumpy(expected_f_sum))
def check_apply_edges1(create_node_flow): def check_apply_edges1(create_node_flow):
...@@ -166,18 +198,18 @@ def check_apply_edges1(create_node_flow): ...@@ -166,18 +198,18 @@ def check_apply_edges1(create_node_flow):
nf.register_apply_edge_func(update_func, i) nf.register_apply_edge_func(update_func, i)
nf.apply_block(i) nf.apply_block(i)
assert F.array_equal(nf.blocks[i].data['h2'], new_feats) assert_array_equal(F.asnumpy(nf.blocks[i].data['h2']), F.asnumpy(new_feats))
# should also work for negative block ids # should also work for negative block ids
nf.register_apply_edge_func(update_func, -num_layers + i) nf.register_apply_edge_func(update_func, -num_layers + i)
nf.apply_block(-num_layers + i) nf.apply_block(-num_layers + i)
assert F.array_equal(nf.blocks[i].data['h2'], new_feats) assert_array_equal(F.asnumpy(nf.blocks[i].data['h2']), F.asnumpy(new_feats))
eids = nf.block_parent_eid(i) eids = nf.block_parent_eid(i)
srcs, dsts = g.find_edges(eids) srcs, dsts = g.find_edges(eids)
expected_f_sum = g.nodes[srcs].data["f"] + g.nodes[dsts].data["f"] expected_f_sum = g.nodes[srcs].data["f"] + g.nodes[dsts].data["f"]
#expected_f_sum = g.ndata["f"][srcs] + g.ndata["f"][dsts] #expected_f_sum = g.ndata["f"][srcs] + g.ndata["f"][dsts]
assert F.array_equal(nf.blocks[i].data['f2'], expected_f_sum) assert_array_equal(F.asnumpy(nf.blocks[i].data['f2']), F.asnumpy(expected_f_sum))
def test_apply_edges(): def test_apply_edges():
...@@ -200,7 +232,9 @@ def check_flow_compute(create_node_flow, use_negative_block_id=False): ...@@ -200,7 +232,9 @@ def check_flow_compute(create_node_flow, use_negative_block_id=False):
lambda nodes: {'h' : nodes.data['t'] + 1}) lambda nodes: {'h' : nodes.data['t'] + 1})
g.update_all(fn.copy_src(src='h', out='m'), fn.sum(msg='m', out='t'), g.update_all(fn.copy_src(src='h', out='m'), fn.sum(msg='m', out='t'),
lambda nodes: {'h' : nodes.data['t'] + 1}) lambda nodes: {'h' : nodes.data['t'] + 1})
assert F.allclose(nf.layers[i + 1].data['h'], g.nodes[nf.layer_parent_nid(i + 1)].data['h']) assert_allclose(F.asnumpy(nf.layers[i + 1].data['h']),
F.asnumpy(g.nodes[nf.layer_parent_nid(i + 1)].data['h']),
rtol=1e-4, atol=1e-4)
# Test the computation when only a few nodes are active in a layer. # Test the computation when only a few nodes are active in a layer.
g.ndata['h'] = g.ndata['h1'] g.ndata['h'] = g.ndata['h1']
...@@ -213,8 +247,7 @@ def check_flow_compute(create_node_flow, use_negative_block_id=False): ...@@ -213,8 +247,7 @@ def check_flow_compute(create_node_flow, use_negative_block_id=False):
lambda nodes: {'h' : nodes.data['t'] + 1}) lambda nodes: {'h' : nodes.data['t'] + 1})
data1 = nf.layers[i + 1].data['h'][0:4] data1 = nf.layers[i + 1].data['h'][0:4]
data2 = g.nodes[nf.map_to_parent_nid(vs)].data['h'] data2 = g.nodes[nf.map_to_parent_nid(vs)].data['h']
assert F.allclose(data1, data2) assert_allclose(F.asnumpy(data1), F.asnumpy(data2), rtol=1e-4, atol=1e-4)
def check_flow_compute1(create_node_flow, use_negative_block_id=False): def check_flow_compute1(create_node_flow, use_negative_block_id=False):
num_layers = 2 num_layers = 2
...@@ -233,7 +266,9 @@ def check_flow_compute1(create_node_flow, use_negative_block_id=False): ...@@ -233,7 +266,9 @@ def check_flow_compute1(create_node_flow, use_negative_block_id=False):
nf.block_compute(l) nf.block_compute(l)
g.update_all(fn.copy_src(src='h', out='m'), fn.sum(msg='m', out='t'), g.update_all(fn.copy_src(src='h', out='m'), fn.sum(msg='m', out='t'),
lambda nodes: {'h' : nodes.data['t'] + 1}) lambda nodes: {'h' : nodes.data['t'] + 1})
assert F.allclose(nf.layers[i + 1].data['h'], g.nodes[nf.layer_parent_nid(i + 1)].data['h']) assert_allclose(F.asnumpy(nf.layers[i + 1].data['h']),
F.asnumpy(g.nodes[nf.layer_parent_nid(i + 1)].data['h']),
rtol=1e-4, atol=1e-4)
# test the case that we register UDFs in all blocks. # test the case that we register UDFs in all blocks.
nf = create_node_flow(g, num_layers) nf = create_node_flow(g, num_layers)
...@@ -248,8 +283,61 @@ def check_flow_compute1(create_node_flow, use_negative_block_id=False): ...@@ -248,8 +283,61 @@ def check_flow_compute1(create_node_flow, use_negative_block_id=False):
nf.block_compute(l) nf.block_compute(l)
g.update_all(fn.copy_src(src='h', out='m'), fn.sum(msg='m', out='t'), g.update_all(fn.copy_src(src='h', out='m'), fn.sum(msg='m', out='t'),
lambda nodes: {'h' : nodes.data['t'] + 1}) lambda nodes: {'h' : nodes.data['t'] + 1})
assert F.allclose(nf.layers[i + 1].data['h'], g.nodes[nf.layer_parent_nid(i + 1)].data['h']) assert_allclose(F.asnumpy(nf.layers[i + 1].data['h']),
F.asnumpy(g.nodes[nf.layer_parent_nid(i + 1)].data['h']),
rtol=1e-4, atol=1e-4)
class SrcMulEdgeMessageFunction(object):
def __init__(self, src_field, edge_field, out_field):
self.mul_op = operator.mul
self.src_field = src_field
self.edge_field = edge_field
self.out_field = out_field
def __call__(self, edges):
sdata = edges.src[self.src_field]
edata = edges.data[self.edge_field]
# Due to the different broadcasting semantics of different backends,
# we need to broadcast the sdata and edata to be of the same rank.
rank = max(F.ndim(sdata), F.ndim(edata))
sshape = F.shape(sdata)
eshape = F.shape(edata)
sdata = F.reshape(sdata, sshape + (1,) * (rank - F.ndim(sdata)))
edata = F.reshape(edata, eshape + (1,) * (rank - F.ndim(edata)))
ret = self.mul_op(sdata, edata)
return {self.out_field : ret}
def check_flow_compute2(create_node_flow):
num_layers = 2
g = generate_rand_graph(100)
g.edata['h'] = F.ones((g.number_of_edges(), 10))
nf = create_node_flow(g, num_layers)
nf.copy_from_parent()
g.ndata['h'] = g.ndata['h1']
nf.layers[0].data['h'] = nf.layers[0].data['h1']
for i in range(num_layers):
nf.block_compute(i, SrcMulEdgeMessageFunction('h', 'h', 't'), fn.sum('t', 'h1'))
nf.block_compute(i, fn.src_mul_edge('h', 'h', 'h'), fn.sum('h', 'h'))
g.update_all(fn.src_mul_edge('h', 'h', 'h'), fn.sum('h', 'h'))
assert_allclose(F.asnumpy(nf.layers[i + 1].data['h1']),
F.asnumpy(nf.layers[i + 1].data['h']),
rtol=1e-4, atol=1e-4)
assert_allclose(F.asnumpy(nf.layers[i + 1].data['h']),
F.asnumpy(g.nodes[nf.layer_parent_nid(i + 1)].data['h']),
rtol=1e-4, atol=1e-4)
nf = create_node_flow(g, num_layers)
g.ndata['h'] = g.ndata['h1']
nf.copy_from_parent()
for i in range(nf.num_layers):
nf.layers[i].data['h'] = nf.layers[i].data['h1']
for i in range(num_layers):
nf.block_compute(i, fn.u_mul_v('h', 'h', 't'), fn.sum('t', 's'))
g.update_all(fn.u_mul_v('h', 'h', 't'), fn.sum('t', 's'))
assert_allclose(F.asnumpy(nf.layers[i + 1].data['s']),
F.asnumpy(g.nodes[nf.layer_parent_nid(i + 1)].data['s']),
rtol=1e-4, atol=1e-4)
def test_flow_compute(): def test_flow_compute():
check_flow_compute(create_full_nodeflow) check_flow_compute(create_full_nodeflow)
...@@ -258,6 +346,7 @@ def test_flow_compute(): ...@@ -258,6 +346,7 @@ def test_flow_compute():
check_flow_compute(create_mini_batch, use_negative_block_id=True) check_flow_compute(create_mini_batch, use_negative_block_id=True)
check_flow_compute1(create_mini_batch) check_flow_compute1(create_mini_batch)
check_flow_compute1(create_mini_batch, use_negative_block_id=True) check_flow_compute1(create_mini_batch, use_negative_block_id=True)
check_flow_compute2(create_mini_batch)
def check_prop_flows(create_node_flow): def check_prop_flows(create_node_flow):
...@@ -274,7 +363,9 @@ def check_prop_flows(create_node_flow): ...@@ -274,7 +363,9 @@ def check_prop_flows(create_node_flow):
# Test the computation on all layers. # Test the computation on all layers.
nf2.prop_flow(fn.copy_src(src='h', out='m'), fn.sum(msg='m', out='t'), nf2.prop_flow(fn.copy_src(src='h', out='m'), fn.sum(msg='m', out='t'),
lambda nodes: {'h' : nodes.data['t'] + 1}) lambda nodes: {'h' : nodes.data['t'] + 1})
assert F.allclose(nf2.layers[-1].data['h'], g.nodes[nf2.layer_parent_nid(-1)].data['h']) assert_allclose(F.asnumpy(nf2.layers[-1].data['h']),
F.asnumpy(g.nodes[nf2.layer_parent_nid(-1)].data['h']),
rtol=1e-4, atol=1e-4)
def test_prop_flows(): def test_prop_flows():
...@@ -292,12 +383,14 @@ def test_copy(): ...@@ -292,12 +383,14 @@ def test_copy():
assert len(g.ndata.keys()) == len(nf.layers[i].data.keys()) assert len(g.ndata.keys()) == len(nf.layers[i].data.keys())
for key in g.ndata.keys(): for key in g.ndata.keys():
assert key in nf.layers[i].data.keys() assert key in nf.layers[i].data.keys()
assert F.array_equal(nf.layers[i].data[key], g.nodes[nf.layer_parent_nid(i)].data[key]) assert_array_equal(F.asnumpy(nf.layers[i].data[key]),
F.asnumpy(g.nodes[nf.layer_parent_nid(i)].data[key]))
for i in range(nf.num_blocks): for i in range(nf.num_blocks):
assert len(g.edata.keys()) == len(nf.blocks[i].data.keys()) assert len(g.edata.keys()) == len(nf.blocks[i].data.keys())
for key in g.edata.keys(): for key in g.edata.keys():
assert key in nf.blocks[i].data.keys() assert key in nf.blocks[i].data.keys()
assert F.array_equal(nf.blocks[i].data[key], g.edges[nf.block_parent_eid(i)].data[key]) assert_array_equal(F.asnumpy(nf.blocks[i].data[key]),
F.asnumpy(g.edges[nf.block_parent_eid(i)].data[key]))
nf = create_mini_batch(g, num_layers) nf = create_mini_batch(g, num_layers)
node_embed_names = [['h'], ['h1'], ['h']] node_embed_names = [['h'], ['h1'], ['h']]
...@@ -307,12 +400,14 @@ def test_copy(): ...@@ -307,12 +400,14 @@ def test_copy():
assert len(node_embed_names[i]) == len(nf.layers[i].data.keys()) assert len(node_embed_names[i]) == len(nf.layers[i].data.keys())
for key in node_embed_names[i]: for key in node_embed_names[i]:
assert key in nf.layers[i].data.keys() assert key in nf.layers[i].data.keys()
assert F.array_equal(nf.layers[i].data[key], g.nodes[nf.layer_parent_nid(i)].data[key]) assert_array_equal(F.asnumpy(nf.layers[i].data[key]),
F.asnumpy(g.nodes[nf.layer_parent_nid(i)].data[key]))
for i in range(nf.num_blocks): for i in range(nf.num_blocks):
assert len(edge_embed_names[i]) == len(nf.blocks[i].data.keys()) assert len(edge_embed_names[i]) == len(nf.blocks[i].data.keys())
for key in edge_embed_names[i]: for key in edge_embed_names[i]:
assert key in nf.blocks[i].data.keys() assert key in nf.blocks[i].data.keys()
assert F.array_equal(nf.blocks[i].data[key], g.edges[nf.block_parent_eid(i)].data[key]) assert_array_equal(F.asnumpy(nf.blocks[i].data[key]),
F.asnumpy(g.edges[nf.block_parent_eid(i)].data[key]))
nf = create_mini_batch(g, num_layers) nf = create_mini_batch(g, num_layers)
g.ndata['h0'] = F.clone(g.ndata['h']) g.ndata['h0'] = F.clone(g.ndata['h'])
...@@ -323,12 +418,13 @@ def test_copy(): ...@@ -323,12 +418,13 @@ def test_copy():
lambda nodes: {'h%d' % (i+1) : nodes.data['t'] + 1}) lambda nodes: {'h%d' % (i+1) : nodes.data['t'] + 1})
g.update_all(fn.copy_src(src='h', out='m'), fn.sum(msg='m', out='t'), g.update_all(fn.copy_src(src='h', out='m'), fn.sum(msg='m', out='t'),
lambda nodes: {'h' : nodes.data['t'] + 1}) lambda nodes: {'h' : nodes.data['t'] + 1})
assert F.allclose(nf.layers[i + 1].data['h%d' % (i+1)], assert_allclose(F.asnumpy(nf.layers[i + 1].data['h%d' % (i+1)]),
g.nodes[nf.layer_parent_nid(i + 1)].data['h']) F.asnumpy(g.nodes[nf.layer_parent_nid(i + 1)].data['h']),
rtol=1e-4, atol=1e-4)
nf.copy_to_parent(node_embed_names=[['h0'], ['h1'], ['h2']]) nf.copy_to_parent(node_embed_names=[['h0'], ['h1'], ['h2']])
for i in range(num_layers + 1): for i in range(num_layers + 1):
assert F.array_equal(nf.layers[i].data['h%d' % i], assert_array_equal(F.asnumpy(nf.layers[i].data['h%d' % i]),
g.nodes[nf.layer_parent_nid(i)].data['h%d' % i]) F.asnumpy(g.nodes[nf.layer_parent_nid(i)].data['h%d' % i]))
nf = create_mini_batch(g, num_layers) nf = create_mini_batch(g, num_layers)
g.ndata['h0'] = F.clone(g.ndata['h']) g.ndata['h0'] = F.clone(g.ndata['h'])
...@@ -354,20 +450,26 @@ def test_block_edges(): ...@@ -354,20 +450,26 @@ def test_block_edges():
nf = create_mini_batch(g, num_layers) nf = create_mini_batch(g, num_layers)
assert nf.num_layers == num_layers + 1 assert nf.num_layers == num_layers + 1
for i in range(nf.num_blocks): for i in range(nf.num_blocks):
src, dst, eid = nf.block_edges(i, remap=True) dest_nodes = utils.toindex(nf.layer_nid(i + 1))
src1, dst1, eid1 = nf.in_edges(dest_nodes, 'all')
src, dst, eid = nf.block_edges(i)
assert_array_equal(F.asnumpy(src), F.asnumpy(src1))
assert_array_equal(F.asnumpy(dst), F.asnumpy(dst1))
assert_array_equal(F.asnumpy(eid), F.asnumpy(eid1))
src, dst, eid = nf.block_edges(i, remap_local=True)
# should also work for negative block ids # should also work for negative block ids
src_by_neg, dst_by_neg, eid_by_neg = nf.block_edges(-nf.num_blocks + i, remap=True) src_by_neg, dst_by_neg, eid_by_neg = nf.block_edges(-nf.num_blocks + i,
assert F.array_equal(src, src_by_neg) remap_local=True)
assert F.array_equal(dst, dst_by_neg) assert_array_equal(F.asnumpy(src), F.asnumpy(src_by_neg))
assert F.array_equal(eid, eid_by_neg) assert_array_equal(F.asnumpy(dst), F.asnumpy(dst_by_neg))
assert_array_equal(F.asnumpy(eid), F.asnumpy(eid_by_neg))
dest_nodes = utils.toindex(nf.layer_nid(i + 1)) src1 = nf._glb2lcl_nid(src1, i)
u, v, _ = nf._graph.in_edges(dest_nodes) dst1 = nf._glb2lcl_nid(dst1, i + 1)
u = nf._glb2lcl_nid(u.tousertensor(), i) assert_array_equal(F.asnumpy(src), F.asnumpy(src1))
v = nf._glb2lcl_nid(v.tousertensor(), i + 1) assert_array_equal(F.asnumpy(dst), F.asnumpy(dst1))
assert F.array_equal(src, u)
assert F.array_equal(dst, v)
def test_block_adj_matrix(): def test_block_adj_matrix():
...@@ -376,7 +478,7 @@ def test_block_adj_matrix(): ...@@ -376,7 +478,7 @@ def test_block_adj_matrix():
nf = create_mini_batch(g, num_layers) nf = create_mini_batch(g, num_layers)
assert nf.num_layers == num_layers + 1 assert nf.num_layers == num_layers + 1
for i in range(nf.num_blocks): for i in range(nf.num_blocks):
u, v, _ = nf.block_edges(i, remap=True) u, v, _ = nf.block_edges(i, remap_local=True)
adj, _ = nf.block_adjacency_matrix(i, F.cpu()) adj, _ = nf.block_adjacency_matrix(i, F.cpu())
adj = F.sparse_to_numpy(adj) adj = F.sparse_to_numpy(adj)
...@@ -389,8 +491,8 @@ def test_block_adj_matrix(): ...@@ -389,8 +491,8 @@ def test_block_adj_matrix():
u = utils.toindex(u) u = utils.toindex(u)
coo = sp.sparse.coo_matrix((data, (v.tonumpy(), u.tonumpy())), coo = sp.sparse.coo_matrix((data, (v.tonumpy(), u.tonumpy())),
shape=adj.shape).todense() shape=adj.shape).todense()
assert np.array_equal(adj, coo) assert_array_equal(adj, coo)
assert np.array_equal(adj_by_neg, coo) assert_array_equal(adj_by_neg, coo)
def test_block_incidence_matrix(): def test_block_incidence_matrix():
...@@ -413,7 +515,7 @@ def test_block_incidence_matrix(): ...@@ -413,7 +515,7 @@ def test_block_incidence_matrix():
adj_by_neg = F.sparse_to_numpy(adj_by_neg) adj_by_neg = F.sparse_to_numpy(adj_by_neg)
adjs_by_neg.append(adj_by_neg) adjs_by_neg.append(adj_by_neg)
u, v, e = nf.block_edges(i, remap=True) u, v, e = nf.block_edges(i, remap_local=True)
u = utils.toindex(u) u = utils.toindex(u)
v = utils.toindex(v) v = utils.toindex(v)
e = utils.toindex(e) e = utils.toindex(e)
...@@ -429,8 +531,8 @@ def test_block_incidence_matrix(): ...@@ -429,8 +531,8 @@ def test_block_incidence_matrix():
shape=adjs[1].shape).todense() shape=adjs[1].shape).todense()
) )
for i in range(len(typestrs)): for i in range(len(typestrs)):
assert np.array_equal(adjs[i], expected[i]) assert_array_equal(adjs[i], expected[i])
assert np.array_equal(adjs_by_neg[i], expected[i]) assert_array_equal(adjs_by_neg[i], expected[i])
if __name__ == '__main__': if __name__ == '__main__':
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment