Unverified Commit b8fe2b48 authored by Qidong Su's avatar Qidong Su Committed by GitHub
Browse files

[Feature][Sampler] Sort CSR by tag (#1664)



* update

* update

* update

* update

* lint

* lint

* update

* update

* update

* update

* update

* update

* update

* update

* update

* update

* update

* update

* update

* update

* lint

* update

* clone

* update

* update

* update

* update

* replace idarray with ndarray

* refactor cpp part

* refactor python part

* debug

* refactor interface

* test and doc

* lint and test

* lint

* fix

* fix

* fix

* const

* doc

* fix

* fix

* fix

* fix

* fix & doc

* fix

* fix

* fix

* fix

* fix

* fix

* update
Co-authored-by: default avatarMinjie Wang <wmjlyjemaine@gmail.com>
parent 0437b164
......@@ -11,6 +11,7 @@
#include <vector>
#include <tuple>
#include <string>
#include <utility>
#include "./types.h"
#include "./array_ops.h"
#include "./spmat.h"
......@@ -430,6 +431,38 @@ COOMatrix CSRRowWiseTopk(
bool ascending = false);
/*!
* \brief Sort the column index according to the tag of each column.
*
* Example:
* indptr = [0, 5, 8]
* indices = [0, 1, 2, 3, 4, 0, 1, 2]
*
* tag = [1, 1, 0, 2, 0]
*
* After CSRSortByTag
*
* indptr = [0, 5, 8]
* indices = [2, 4, 0, 1, 3, 2, 0, 1]
* (tag) = [0, 0, 1, 1, 2, 0, 1, 1]
* ^ ^ ^ ^
* ^ ^ ^^
* (the tag array itself is unchanged.)
*
* Return:
* [[0, 2, 4, 5], [0, 1, 3, 3]] (marked with ^)
*
* \param csr The csr matrix to be sorted
* \param tag_array Tag of each column. IdArray with length num_cols
* \param num_tags Number of tags. It should be equal to max(tag_array)+1.
* \return 1. A sorted copy of the given CSR matrix
* 2. The split positions of different tags. NDArray of shape (num_rows, num_tags + 1)
*/
std::pair<CSRMatrix, NDArray> CSRSortByTag(
const CSRMatrix &csr,
const IdArray tag_array,
int64_t num_tags);
/*
* \brief Union two CSRMatrix into one CSRMatrix.
*
* Two Matrix must have the same shape.
......
......@@ -5596,7 +5596,6 @@ class DGLHeteroGraph(object):
gidx = self._graph.shared_memory(name, self.ntypes, self.etypes, formats)
return DGLHeteroGraph(gidx, self.ntypes, self.etypes)
def long(self):
"""Cast the graph to one with idtype int64
......
......@@ -44,6 +44,8 @@ __all__ = [
'to_simple',
'to_simple_graph',
'as_immutable_graph',
'sort_out_edges',
'sort_in_edges',
'metis_partition_assignment',
'partition_graph_with_halo',
'metis_partition',
......@@ -2599,4 +2601,177 @@ def as_immutable_graph(hg):
'\tdgl.as_immutable_graph will do nothing and can be removed safely in all cases.')
return hg
def sort_out_edges(g, tag, tag_offset_name='_TAG_OFFSET'):
"""Return a new graph which sorts the out edges of each node.
Sort the out edges according to the given destination node tags in integer.
A typical use case is to sort the edges by the destination node types, where
the tags represent destination node types. After sorting, edges sharing
the same tag will be arranged in a consecutive range in
a node's adjacency list. Following is an example:
Consider a graph as follows:
0 -> 0, 1, 2, 3, 4
1 -> 0, 1, 2
Given node tags [1, 1, 0, 2, 0], each node's adjacency list
will be sorted as follows:
0 -> 2, 4, 0, 1, 3
1 -> 2, 0, 1
The function will also returns the starting offsets of the tag
segments in a tensor of shape `(N, max_tag+2)`. For node `i`,
its out-edges connecting to node tag `j` is stored between
`tag_offsets[i][j]` ~ `tag_offsets[i][j+1]`. Since the offsets
can be viewed node data, we store it in the
`ndata` of the returned graph. Users can specify the
ndata name by the `tag_pos_name` argument.
Note that the function will not change the edge ID neither
how the edge features are stored. The input graph must
allow CSR format. Graph must be on CPU.
If the input graph is heterogenous, it must have only one edge
type and two node types (i.e., source and destination node types).
In this case, the provided node tags are for the destination nodes,
and the tag offsets are stored in the source node data.
The sorted graph and the calculated tag offsets are needed by
certain operators that consider node tags. See `sample_neighbors_biased`
for an example.
Examples
-----------
>>> g = dgl.graph(([0,0,0,0,0,1,1,1],[0,1,2,3,4,0,1,2]))
>>> g.adjacency_matrix(scipy_fmt='csr').nonzero()
(array([0, 0, 0, 0, 0, 1, 1, 1], dtype=int32),
array([0, 1, 2, 3, 4, 0, 1, 2], dtype=int32))
>>> tag = torch.IntTensor([1,1,0,2,0])
>>> g_sorted = dgl.transform.sort_out_edges(g, tag)
>>> g_sorted.adjacency_matrix(scipy_fmt='csr').nonzero()
(array([0, 0, 0, 0, 0, 1, 1, 1], dtype=int32),
array([2, 4, 0, 1, 3, 2, 0, 1], dtype=int32))
>>> g_sorted.ndata['_TAG_OFFSET']
tensor([[0, 2, 4, 5],
[0, 1, 3, 3],
[0, 0, 0, 0],
[0, 0, 0, 0],
[0, 0, 0, 0]])
Parameters
------------
g : DGLGraph
The input graph.
tag : Tensor
Integer tensor of shape `(N,)`, `N` being the number of (destination) nodes.
tag_offset_name : str
The name of the node feature to store tag offsets.
Returns
-------
g_sorted : DGLGraph
A new graph whose out edges are sorted. The node/edge features of the
input graph is shallow-copied over.
- `g_sorted.ndata[tag_offset_name]` : Tensor of shape `(N, max_tag + 2)`. If
`g` is heterogeneous, get from `g_sorted.srcdata`.
"""
if len(g.etypes) > 1:
raise DGLError("Only support homograph and bipartite graph")
num_tags = int(F.asnumpy(F.max(tag, 0))) + 1
tag_arr = F.zerocopy_to_dgl_ndarray(tag)
new_g = g.clone()
new_g._graph, tag_pos_arr = _CAPI_DGLHeteroSortOutEdges(g._graph, tag_arr, num_tags)
new_g.srcdata[tag_offset_name] = F.from_dgl_nd(tag_pos_arr)
return new_g
def sort_in_edges(g, tag, tag_offset_name='_TAG_OFFSET'):
"""Return a new graph which sorts the in edges of each node.
Sort the in edges according to the given source node tags in integer.
A typical use case is to sort the edges by the source node types, where
the tags represent source node types. After sorting, edges sharing
the same tag will be arranged in a consecutive range in
a node's adjacency list. Following is an example:
Consider a graph as follows:
0 <- 0, 1, 2, 3, 4
1 <- 0, 1, 2
Given node tags [1, 1, 0, 2, 0], each node's adjacency list
will be sorted as follows:
0 <- 2, 4, 0, 1, 3
1 <- 2, 0, 1
The function will also returns the starting offsets of the tag
segments in a tensor of shape `(N, max_tag+2)`. For node `i`,
its in-edges connecting to node tag `j` is stored between
`tag_offsets[i][j]` ~ `tag_offsets[i][j+1]`. Since the offsets
can be viewed node data, we store it in the
`ndata` of the returned graph. Users can specify the
ndata name by the `tag_pos_name` argument.
Note that the function will not change the edge ID neither
how the edge features are stored. The input graph must
allow CSR format. Graph must be on CPU.
If the input graph is heterogenous, it must have only one edge
type and two node types (i.e., source and destination node types).
In this case, the provided node tags are for the source nodes,
and the tag offsets are stored in the destination node data.
The sorted graph and the calculated tag offsets are needed by
certain operators that consider node tags. See `sample_neighbors_biased`
for an example.
Examples
-----------
>>> g = dgl.graph(([0,1,2,3,4,0,1,2],[0,0,0,0,0,1,1,1]))
>>> g.adjacency_matrix(scipy_fmt='csr', transpose=False).nonzero()
(array([0, 0, 0, 0, 0, 1, 1, 1], dtype=int32),
array([0, 1, 2, 3, 4, 0, 1, 2], dtype=int32)))
>>> tag = torch.IntTensor([1,1,0,2,0])
>>> g_sorted = dgl.transform.sort_in_edges(g, tag)
>>> g_sorted.adjacency_matrix(scipy_fmt='csr', transpose=False).nonzero()
(array([0, 0, 0, 0, 0, 1, 1, 1], dtype=int32),
array([2, 4, 0, 1, 3, 2, 0, 1], dtype=int32))
>>> g_sorted.ndata['_TAG_OFFSET']
tensor([[0, 2, 4, 5],
[0, 1, 3, 3],
[0, 0, 0, 0],
[0, 0, 0, 0],
[0, 0, 0, 0]])
Parameters
------------
g : DGLGraph
The input graph.
tag : Tensor
Integer tensor of shape `(N,)`, `N` being the number of (source) nodes.
tag_offset_name : str
The name of the node feature to store tag offsets.
Returns
-------
g_sorted : DGLGraph
A new graph whose out edges are sorted. The node/edge features of the
input graph is shallow-copied over.
- `g_sorted.ndata[tag_offset_name]` : Tensor of shape `(N, max_tag + 2)`. If
`g` is heterogeneous, get from `g_sorted.dstdata`.
"""
if len(g.etypes) > 1:
raise DGLError("Only support homograph and bipartite graph")
num_tags = int(F.asnumpy(F.max(tag, 0))) + 1
tag_arr = F.zerocopy_to_dgl_ndarray(tag)
new_g = g.clone()
new_g._graph, tag_pos_arr = _CAPI_DGLHeteroSortInEdges(g._graph, tag_arr, num_tags)
new_g.dstdata[tag_offset_name] = F.from_dgl_nd(tag_pos_arr)
return new_g
_init_api("dgl.transform")
......@@ -519,6 +519,21 @@ void CSRSort_(CSRMatrix* csr) {
});
}
std::pair<CSRMatrix, NDArray> CSRSortByTag(
const CSRMatrix &csr, IdArray tag, int64_t num_tags) {
CHECK_EQ(csr.num_cols, tag->shape[0])
<< "The length of the tag array should be equal to the number of columns ";
CHECK_SAME_CONTEXT(csr.indices, tag);
CHECK_INT(tag, "tag");
std::pair<CSRMatrix, NDArray> ret;
ATEN_CSR_SWITCH(csr, XPU, IdType, "CSRSortByTag", {
ATEN_ID_TYPE_SWITCH(tag->dtype, TagType, {
ret = impl::CSRSortByTag<XPU, IdType, TagType>(csr, tag, num_tags);
});
});
return ret;
}
CSRMatrix CSRReorder(CSRMatrix csr, runtime::NDArray new_row_ids, runtime::NDArray new_col_ids) {
CSRMatrix ret;
ATEN_CSR_SWITCH(csr, XPU, IdType, "CSRReorder", {
......
......@@ -146,6 +146,10 @@ CSRMatrix CSRSliceMatrix(CSRMatrix csr, runtime::NDArray rows, runtime::NDArray
template <DLDeviceType XPU, typename IdType>
void CSRSort_(CSRMatrix* csr);
template <DLDeviceType XPU, typename IdType, typename TagType>
std::pair<CSRMatrix, NDArray> CSRSortByTag(
const CSRMatrix &csr, IdArray tag_array, int64_t num_tags);
template <DLDeviceType XPU, typename IdType>
CSRMatrix CSRReorder(CSRMatrix csr, runtime::NDArray new_row_ids, runtime::NDArray new_col_ids);
......
......@@ -78,6 +78,73 @@ void CSRSort_(CSRMatrix* csr) {
template void CSRSort_<kDLCPU, int64_t>(CSRMatrix* csr);
template void CSRSort_<kDLCPU, int32_t>(CSRMatrix* csr);
template <DLDeviceType XPU, typename IdType, typename TagType>
std::pair<CSRMatrix, NDArray> CSRSortByTag(
const CSRMatrix &csr, const IdArray tag_array, int64_t num_tags) {
const auto indptr_data = static_cast<const IdType *>(csr.indptr->data);
const auto indices_data = static_cast<const IdType *>(csr.indices->data);
const auto eid_array = aten::CSRHasData(csr) ? csr.data :
aten::Range(0, csr.indices->shape[0], csr.indptr->dtype.bits, csr.indptr->ctx);
const auto eid_data = static_cast<const IdType *>(csr.data->data);
const auto tag_data = static_cast<const TagType *>(tag_array->data);
const int64_t num_rows = csr.num_rows;
NDArray tag_pos = NDArray::Empty({csr.num_rows, num_tags + 1},
csr.indptr->dtype, csr.indptr->ctx);
auto tag_pos_data = static_cast<IdType *>(tag_pos->data);
std::fill(tag_pos_data, tag_pos_data + csr.num_rows * (num_tags + 1), 0);
aten::CSRMatrix output(csr.num_rows, csr.num_cols,
csr.indptr.Clone(), csr.indices.Clone(),
eid_array.Clone(), csr.sorted);
auto out_indices_data = static_cast<IdType *>(output.indices->data);
auto out_eid_data = static_cast<IdType *>(output.data->data);
#pragma omp parallel for
for (IdType src = 0 ; src < num_rows ; ++src) {
const IdType start = indptr_data[src];
const IdType end = indptr_data[src + 1];
auto tag_pos_row = tag_pos_data + src * (num_tags + 1);
std::vector<IdType> pointer(num_tags, 0);
for (IdType ptr = start ; ptr < end ; ++ptr) {
const IdType dst = indices_data[ptr];
const TagType tag = tag_data[dst];
CHECK_LT(tag, num_tags);
++tag_pos_row[tag + 1];
} // count
for (TagType tag = 1 ; tag <= num_tags; ++tag) {
tag_pos_row[tag] += tag_pos_row[tag - 1];
} // cumulate
for (IdType ptr = start ; ptr < end ; ++ptr) {
const IdType dst = indices_data[ptr];
const IdType eid = eid_data[ptr];
const TagType tag = tag_data[dst];
const IdType offset = tag_pos_row[tag] + pointer[tag];
CHECK_LT(offset, tag_pos_row[tag + 1]);
++pointer[tag];
out_indices_data[start + offset] = dst;
out_eid_data[start + offset] = eid;
}
}
output.sorted = false;
return std::make_pair(output, tag_pos);
}
template std::pair<CSRMatrix, NDArray> CSRSortByTag<kDLCPU, int64_t, int64_t>(
const CSRMatrix &csr, const IdArray tag, int64_t num_tags);
template std::pair<CSRMatrix, NDArray> CSRSortByTag<kDLCPU, int64_t, int32_t>(
const CSRMatrix &csr, const IdArray tag, int64_t num_tags);
template std::pair<CSRMatrix, NDArray> CSRSortByTag<kDLCPU, int32_t, int64_t>(
const CSRMatrix &csr, const IdArray tag, int64_t num_tags);
template std::pair<CSRMatrix, NDArray> CSRSortByTag<kDLCPU, int32_t, int32_t>(
const CSRMatrix &csr, const IdArray tag, int64_t num_tags);
} // namespace impl
} // namespace aten
} // namespace dgl
......@@ -657,6 +657,51 @@ DGL_REGISTER_GLOBAL("transform._CAPI_DGLAsImmutableGraph")
*rv = GraphRef(hg->AsImmutableGraph());
});
DGL_REGISTER_GLOBAL("transform._CAPI_DGLHeteroSortOutEdges")
.set_body([] (DGLArgs args, DGLRetValue* rv) {
HeteroGraphRef hg = args[0];
NDArray tag = args[1];
int64_t num_tag = args[2];
CHECK_EQ(hg->Context().device_type, kDLCPU) << "Only support sorting by tag on cpu";
CHECK(aten::IsValidIdArray(tag));
CHECK_EQ(tag->ctx.device_type, kDLCPU) << "Only support sorting by tag on cpu";
const auto csr = hg->GetCSRMatrix(0);
NDArray tag_pos = aten::NullArray();
aten::CSRMatrix output;
std::tie(output, tag_pos) = aten::CSRSortByTag(csr, tag, num_tag);
HeteroGraphPtr output_hg = CreateFromCSR(hg->NumVertexTypes(), output, ALL_CODE);
List<ObjectRef> ret;
ret.push_back(HeteroGraphRef(output_hg));
ret.push_back(Value(MakeValue(tag_pos)));
*rv = ret;
});
DGL_REGISTER_GLOBAL("transform._CAPI_DGLHeteroSortInEdges")
.set_body([] (DGLArgs args, DGLRetValue* rv) {
HeteroGraphRef hg = args[0];
NDArray tag = args[1];
int64_t num_tag = args[2];
CHECK_EQ(hg->Context().device_type, kDLCPU) << "Only support sorting by tag on cpu";
CHECK(aten::IsValidIdArray(tag));
CHECK_EQ(tag->ctx.device_type, kDLCPU) << "Only support sorting by tag on cpu";
const auto csc = hg->GetCSCMatrix(0);
NDArray tag_pos = aten::NullArray();
aten::CSRMatrix output;
std::tie(output, tag_pos) = aten::CSRSortByTag(csc, tag, num_tag);
HeteroGraphPtr output_hg = CreateFromCSC(hg->NumVertexTypes(), output, ALL_CODE);
List<ObjectRef> ret;
ret.push_back(HeteroGraphRef(output_hg));
ret.push_back(Value(MakeValue(tag_pos)));
*rv = ret;
});
DGL_REGISTER_GLOBAL("heterograph._CAPI_DGLFindSrcDstNtypes")
.set_body([] (DGLArgs args, DGLRetValue* rv) {
GraphRef metagraph = args[0];
......
import dgl
import dgl.function as fn
from collections import Counter
import numpy as np
import scipy.sparse as ssp
import itertools
import backend as F
import networkx as nx
import unittest, pytest
from dgl import DGLError
from utils import parametrize_dtype
def create_test_heterograph(num_nodes, num_adj, idtype):
if isinstance(num_adj, int):
num_adj = [num_adj, num_adj+1]
num_adj_list = list(np.random.choice(np.arange(num_adj[0], num_adj[1]), num_nodes))
src = np.concatenate([[i] * num_adj_list[i] for i in range(num_nodes)])
dst = [np.random.choice(num_nodes, nadj, replace=False) for nadj in num_adj_list]
dst = np.concatenate(dst)
return dgl.graph((src, dst), idtype=idtype)
def check_sort(spm, tag_arr=None, tag_pos=None):
if tag_arr is None:
tag_arr = np.arange(spm.shape[0])
else:
tag_arr = F.asnumpy(tag_arr)
if tag_pos is not None:
tag_pos = F.asnumpy(tag_pos)
for i in range(spm.shape[0]):
row = spm.getrow(i)
dst = row.nonzero()[1]
if tag_pos is not None:
tag_pos_row = tag_pos[i]
tag_pos_ptr = tag_arr[dst[0]] if len(dst) > 0 else 0
for j in range(len(dst) - 1):
if tag_pos is not None and tag_arr[dst[j]] != tag_pos_ptr:
# `tag_pos_ptr` is the expected tag value. Here we check whether the
# tag value is equal to `tag_pos_ptr`
return False
if tag_arr[dst[j]] > tag_arr[dst[j+1]]:
# The tag should be in descending order after sorting
return False
if tag_pos is not None and tag_arr[dst[j]] < tag_arr[dst[j+1]]:
if j+1 != int(tag_pos_row[tag_pos_ptr+1]):
# The boundary of tag should be consistent with `tag_pos`
return False
tag_pos_ptr = tag_arr[dst[j+1]]
return True
@unittest.skipIf(F._default_context_str == 'gpu', reason="GPU sorting by tag not implemented")
@parametrize_dtype
def test_sort_with_tag(idtype):
num_nodes, num_adj, num_tags = 200, [20, 50], 5
g = create_test_heterograph(num_nodes, num_adj, idtype=idtype)
tag = F.tensor(np.random.choice(num_tags, g.number_of_nodes()))
new_g = dgl.sort_out_edges(g, tag)
old_csr = g.adjacency_matrix(scipy_fmt='csr')
new_csr = new_g.adjacency_matrix(scipy_fmt='csr')
assert(check_sort(new_csr, tag, new_g.ndata["_TAG_OFFSET"]))
assert(not check_sort(old_csr, tag)) # Check the original csr is not modified.
new_g = dgl.sort_in_edges(g, tag)
old_csc = g.adjacency_matrix(transpose=False, scipy_fmt='csr')
new_csc = new_g.adjacency_matrix(transpose=False, scipy_fmt='csr')
assert(check_sort(new_csc, tag, new_g.ndata["_TAG_OFFSET"]))
assert(not check_sort(old_csc, tag))
@unittest.skipIf(F._default_context_str == 'gpu', reason="GPU sorting by tag not implemented")
@parametrize_dtype
def test_sort_with_tag_bipartite(idtype):
num_nodes, num_adj, num_tags = 200, [20, 50], 5
g = create_test_heterograph(num_nodes, num_adj, idtype=idtype)
g = dgl.heterograph({('_U', '_E', '_V') : g.edges()})
utag = F.tensor(np.random.choice(num_tags, g.number_of_nodes('_U')))
vtag = F.tensor(np.random.choice(num_tags, g.number_of_nodes('_V')))
new_g = dgl.sort_out_edges(g, vtag)
old_csr = g.adjacency_matrix(scipy_fmt='csr')
new_csr = new_g.adjacency_matrix(scipy_fmt='csr')
assert(check_sort(new_csr, vtag, new_g.nodes['_U'].data['_TAG_OFFSET']))
assert(not check_sort(old_csr, vtag))
new_g = dgl.sort_in_edges(g, utag)
old_csc = g.adjacency_matrix(transpose=False, scipy_fmt='csr')
new_csc = new_g.adjacency_matrix(transpose=False, scipy_fmt='csr')
assert(check_sort(new_csc, utag, new_g.nodes['_V'].data['_TAG_OFFSET']))
assert(not check_sort(old_csc, utag))
if __name__ == "__main__":
test_sort_with_tag(F.int32)
test_sort_with_tag_bipartite(F.int32)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment