Unverified Commit 70a499e3 authored by Israt Nisa's avatar Israt Nisa Committed by GitHub
Browse files

[Feature] Add CUDA support for `min` and `max` reducer in heterogeneous API...


[Feature] Add CUDA support for `min` and `max` reducer in heterogeneous API for unary message functions (#3566)

* CUDA support max/min reducer on forward pass

* docstring

* concised UpdateGradMinMax_hetero

* reorganized UpdateGradMinMax_hetero

* CUDA kernels for max/min reducer

* variable name

* lint check

* changed CUDA 2D thread mapping to 1D

* removed legacy cusparse for min/max reducer

* git CI issue

* restarting git CI

* adding namespace std
Co-authored-by: default avatarIsrat Nisa <nisisrat@amazon.com>
Co-authored-by: default avatarQuan (Andy) Gan <coin2028@hotmail.com>
parent dd762a1e
......@@ -105,6 +105,7 @@ void ScatterAdd(NDArray feat, NDArray idx, NDArray out) {
}
/*!
* \brief CPU kernel to update gradients for reduce op max/min
* \param graph The input heterogeneous graph.
* \param op The binary operator, could be `copy_u`, `copy_e'.
* \param list_feat List of the input tensors.
......@@ -117,61 +118,35 @@ void UpdateGradMinMax_hetero(HeteroGraphPtr graph,
const std::string& op,
const std::vector<NDArray>& list_feat,
const std::vector<NDArray>& list_idx,
const std::vector<NDArray>& list_idx_ntypes,
const std::vector<NDArray>& list_idx_types,
std::vector<NDArray>* list_out) {
if (op == "copy_lhs") {
std::vector<std::vector<dgl_id_t>> dst_src_ntids(graph->NumVertexTypes(),
if (op == "copy_lhs" || op == "copy_rhs") {
std::vector<std::vector<dgl_id_t>> src_dst_ntypes(graph->NumVertexTypes(),
std::vector<dgl_id_t>());
for (dgl_type_t etype = 0; etype < graph->NumEdgeTypes(); ++etype) {
auto pair = graph->meta_graph()->FindEdge(etype);
const dgl_id_t dst_id = pair.first; // graph is reversed
const dgl_id_t src_id = pair.second;
dst_src_ntids[dst_id].push_back(src_id); // can have duplicates. Use Hashtable to optimize.
}
std::vector<bool> updated(graph->NumVertexTypes());
for (int dst_id = 0; dst_id < dst_src_ntids.size(); ++dst_id) {
std::fill(updated.begin(), updated.end(), false);
for (int j = 0; j < dst_src_ntids[dst_id].size(); ++j) {
int src_id = dst_src_ntids[dst_id][j];
if (updated[src_id]) continue;
const DType* feat_data = list_feat[dst_id].Ptr<DType>();
const IdType* idx_data = list_idx[dst_id].Ptr<IdType>();
const IdType* idx_ntype_data = list_idx_ntypes[dst_id].Ptr<IdType>();
DType* out_data = (*list_out)[src_id].Ptr<DType>();
int dim = 1;
for (int i = 1; i < (*list_out)[src_id]->ndim; ++i)
dim *= (*list_out)[src_id]->shape[i];
int n = list_feat[dst_id]->shape[0];
#pragma omp parallel for
for (int i = 0; i < n; ++i) {
for (int k = 0; k < dim; ++k) {
if (src_id == idx_ntype_data[i * dim + k]) {
const int write_row = idx_data[i * dim + k];
#pragma omp atomic
out_data[write_row * dim + k] += feat_data[i * dim + k]; // feat = dZ
}
}
}
updated[src_id] = true;
}
}
} else if (op == "copy_rhs") {
for (dgl_type_t etid = 0; etid < graph->NumEdgeTypes(); ++etid) {
auto pair = graph->meta_graph()->FindEdge(etid);
const dgl_id_t dst_id = pair.first; // graph is reversed
const dgl_id_t src_id = pair.second;
const DType* feat_data = list_feat[dst_id].Ptr<DType>();
const IdType* idx_data = list_idx[dst_id].Ptr<IdType>();
const IdType* idx_ntype_data = list_idx_ntypes[dst_id].Ptr<IdType>();
DType* out_data = (*list_out)[etid].Ptr<DType>();
const dgl_id_t dst_ntype = pair.first; // graph is reversed
const dgl_id_t src_ntype = pair.second;
auto same_src_dst_ntype = std::find(std::begin(src_dst_ntypes[dst_ntype]),
std::end(src_dst_ntypes[dst_ntype]), src_ntype);
// if op is "copy_lhs", relation type with same src and dst node type will be updated once
if (op == "copy_lhs" && same_src_dst_ntype != std::end(src_dst_ntypes[dst_ntype]))
continue;
src_dst_ntypes[dst_ntype].push_back(src_ntype);
const DType* feat_data = list_feat[dst_ntype].Ptr<DType>();
const IdType* idx_data = list_idx[dst_ntype].Ptr<IdType>();
const IdType* idx_type_data = list_idx_types[dst_ntype].Ptr<IdType>();
int type = (op == "copy_lhs") ? src_ntype : etype;
DType* out_data = (*list_out)[type].Ptr<DType>();
int dim = 1;
for (int i = 1; i < (*list_out)[etid]->ndim; ++i)
dim *= (*list_out)[etid]->shape[i];
int n = list_feat[dst_id]->shape[0];
for (int i = 1; i < (*list_out)[type]->ndim; ++i)
dim *= (*list_out)[type]->shape[i];
int n = list_feat[dst_ntype]->shape[0];
#pragma omp parallel for
for (int i = 0; i < n; ++i) {
for (int k = 0; k < dim; ++k) {
if (etid == idx_ntype_data[i * dim + k]) {
if (type == idx_type_data[i * dim + k]) {
const int write_row = idx_data[i * dim + k];
#pragma omp atomic
out_data[write_row * dim + k] += feat_data[i * dim + k]; // feat = dZ
......
......@@ -319,12 +319,19 @@ void SpMMCmpCsr(const BcastOff& bcast, const CSRMatrix& csr, NDArray ufeat,
* \param argu Arg-Min/Max on source nodes, which refers the source node indices
* correspond to the minimum/maximum values of reduction result on
* destination nodes. It's useful in computing gradients of Min/Max
* reducer. \param arge Arg-Min/Max on edges. which refers the source node
* indices correspond to the minimum/maximum values of reduction result on
* reducer.
* \param arge Arg-Min/Max on edges. which refers the source node
* indices correspond to the minimum/maximum values of reduction result on
* destination nodes. It's useful in computing gradients of Min/Max
* reducer. \note It uses node parallel strategy, different threads are
* responsible for the computation of different nodes. \note The result will
* contain infinity for zero-degree nodes.
* reducer.
* \param argu_ntype Node type of the arg-Min/Max on source nodes, which refers the
* source node types correspond to the minimum/maximum values of reduction result
* on destination nodes. It's useful in computing gradients of Min/Max reducer.
* \param arge_etype Edge-type of the arg-Min/Max on edges. which refers the source
* node indices correspond to the minimum/maximum values of reduction result on
* destination nodes. It's useful in computing gradients of Min/Max reducer.
* \param src_type Node type of the source nodes of an etype
* \param etype Edge type
*/
template <typename IdType, typename DType, typename Op, typename Cmp>
void SpMMCmpCsrHetero(const BcastOff& bcast, const CSRMatrix& csr, NDArray ufeat,
......
......@@ -58,7 +58,7 @@ void UpdateGradMinMax_hetero(const HeteroGraphPtr& g,
const std::vector<NDArray>& idx_etype,
std::vector<NDArray>* out) {
SWITCH_BITS(bits, DType, {
LOG(FATAL) << "Not implemented. Please use CPU version.";
cuda::UpdateGradMinMax_hetero<IdType, DType>(g, op, feat, idx, idx_etype, out);
});
}
......
......@@ -63,6 +63,34 @@ __global__ void ScatterAddKernel(
}
}
/*!
* \brief CUDA kernel to update gradients for reduce op max/min
* \note each WARP (group of 32 threads) is responsible for adding a row in
* feature tensor to a target row in output tensor.
*/
template <typename IdType, typename DType>
__global__ void UpdateGradMinMaxHeteroKernel(
const DType *feat, const IdType *idx, const IdType *idx_type, DType *out,
int64_t n, int64_t dim, int type) {
unsigned int tId = threadIdx.x;
unsigned int laneId = tId & 31;
unsigned int gId = blockIdx.x * blockDim.x + threadIdx.x;
unsigned int warpId = gId >> 5;
unsigned int warp_size = 32;
unsigned int row = warpId;
while (row < n) {
for(unsigned int col = laneId; col < dim; col += warp_size) {
if (type == idx_type[row * dim + col]) {
const int write_row = idx[row * dim + col];
cuda::AtomicAdd(out + write_row * dim + col, feat[row * dim + col]);
}
}
row += blockDim.x * gridDim.x;
}
}
/*!
* \brief CUDA kernel of backward phase in segment min/max.
* \note each blockthread is responsible for writing a row in the
......@@ -155,6 +183,58 @@ void ScatterAdd(
n, dim);
}
/*!
* \brief CUDA implementation to update gradients for reduce op max/min
* \param graph The input heterogeneous graph.
* \param op The binary operator, could be `copy_u`, `copy_e'.
* \param list_feat List of the input tensors.
* \param list_idx List of the indices tensors.
* \param list_idx_etype List of the node- or edge-type tensors.
* \param list_out List of the output tensors.
*/
template <typename IdType, typename DType>
void UpdateGradMinMax_hetero(const HeteroGraphPtr& graph,
const std::string& op,
const std::vector<NDArray>& list_feat,
const std::vector<NDArray>& list_idx,
const std::vector<NDArray>& list_idx_types,
std::vector<NDArray>* list_out) {
if (op == "copy_lhs" || op == "copy_rhs") {
std::vector<std::vector<dgl_id_t>> src_dst_ntypes(graph->NumVertexTypes(),
std::vector<dgl_id_t>());
for (dgl_type_t etype = 0; etype < graph->NumEdgeTypes(); ++etype) {
auto pair = graph->meta_graph()->FindEdge(etype);
const dgl_id_t dst_ntype = pair.first; // graph is reversed
const dgl_id_t src_ntype = pair.second;
auto same_src_dst_ntype = std::find(std::begin(src_dst_ntypes[dst_ntype]),
std::end(src_dst_ntypes[dst_ntype]), src_ntype);
// if op is "copy_lhs", relation type with same src and dst node type will be updated once
if (op == "copy_lhs" && same_src_dst_ntype != std::end(src_dst_ntypes[dst_ntype]))
continue;
src_dst_ntypes[dst_ntype].push_back(src_ntype);
const DType* feat_data = list_feat[dst_ntype].Ptr<DType>();
const IdType* idx_data = list_idx[dst_ntype].Ptr<IdType>();
const IdType* idx_type_data = list_idx_types[dst_ntype].Ptr<IdType>();
int type = (op == "copy_lhs") ? src_ntype : etype;
DType* out_data = (*list_out)[type].Ptr<DType>();
int dim = 1;
for (int i = 1; i < (*list_out)[type]->ndim; ++i)
dim *= (*list_out)[type]->shape[i];
int n = list_feat[dst_ntype]->shape[0];
auto *thr_entry = runtime::CUDAThreadEntry::ThreadLocal();
const int th_per_row = 32;
const int ntx = 128;
const int nbx = FindNumBlocks<'x'>((n * th_per_row + ntx - 1) / ntx);
const dim3 nblks(nbx);
const dim3 nthrs(ntx);
CUDA_KERNEL_CALL((UpdateGradMinMaxHeteroKernel<IdType, DType>),
nblks, nthrs, 0, thr_entry->stream,
feat_data, idx_data, idx_type_data,
out_data, n, dim, type);
}
}
}
/*!
* \brief CUDA implementation of backward phase of Segment Reduce with Min/Max reducer.
* \note math equation: out[arg[i, k], k] = feat[i, k]
......
......@@ -526,7 +526,7 @@ void SpMMCsrHetero(const std::string& op, const std::string& reduce,
std::vector<DType*> trans_out((*vec_out).size(), NULL);
bool use_legacy_cusparsemm =
(CUDART_VERSION < 11000) &&
(CUDART_VERSION < 11000) && (reduce == "sum") &&
// legacy cuSPARSE does not care about NNZ, hence the argument "false".
((op == "copy_lhs" && cusparse_available<bits, IdType>(false)) ||
(op == "mul" && is_scalar_efeat && cusparse_available<bits, IdType>(false)));
......@@ -542,7 +542,6 @@ void SpMMCsrHetero(const std::string& op, const std::string& reduce,
trans_out[ntype] = out;
}
}
// Check shape of ufeat for all relation type and compute feature size
int64_t x_length = 1;
for (dgl_type_t etype = 0; etype < (ufeat_ntids.size() - 1); ++etype) {
......@@ -565,6 +564,30 @@ void SpMMCsrHetero(const std::string& op, const std::string& reduce,
x_length *= ufeat->shape[i];
}
}
// TODO(Israt): Can python do the following initializations while creating the tensors?
if (reduce == "max" || reduce == "min") {
const int64_t dim = bcast.out_len;
std::vector<bool> updated((*vec_out).size(), false);
for (dgl_type_t etype = 0; etype < ufeat_ntids.size(); ++etype) {
DType *out_off = (*vec_out)[out_ntids[etype]].Ptr<DType>();
if (reduce == "max")
_Fill(out_off, vec_csr[etype].num_rows * dim, cuda::reduce::Max<IdType, DType>::zero());
else // min
_Fill(out_off, vec_csr[etype].num_rows * dim, cuda::reduce::Min<IdType, DType>::zero());
const dgl_type_t dst_id = out_ntids[etype];
if (!updated[dst_id]) {
updated[dst_id] = true;
if (op == "copy_lhs") {
IdType *argu_ntype = (*out_aux)[2][dst_id].Ptr<IdType>();
_Fill(argu_ntype, vec_csr[etype].num_rows * dim, static_cast<IdType>(-1));
}
if (op == "copy_rhs") {
IdType *arge_etype = (*out_aux)[3][dst_id].Ptr<IdType>();
_Fill(arge_etype, vec_csr[etype].num_rows * dim, static_cast<IdType>(-1));
}
}
}
}
auto* thr_entry = runtime::CUDAThreadEntry::ThreadLocal();
for (dgl_type_t etype = 0; etype < ufeat_ntids.size(); ++etype) {
......@@ -606,7 +629,28 @@ void SpMMCsrHetero(const std::string& op, const std::string& reduce,
bcast, csr, ufeat, efeat, (*vec_out)[dst_id], NullArray(), NullArray());
});
}
// TODO(Israt): Add support for max/min reducer
} else if (reduce == "max") {
SWITCH_OP(op, Op, {
NDArray ufeat = (vec_ufeat.size() == 0) ?
NullArray() : vec_ufeat[src_id];
NDArray efeat = (vec_efeat.size() == 0) ?
NullArray() : vec_efeat[etype];
cuda::SpMMCmpCsrHetero<IdType, DType, Op, cuda::reduce::Max<IdType, DType> >(
bcast, csr, ufeat, efeat, (*vec_out)[dst_id], (*out_aux)[0][dst_id],
(*out_aux)[1][dst_id], (*out_aux)[2][dst_id], (*out_aux)[3][dst_id],
src_id, etype);
});
} else if (reduce == "min") {
SWITCH_OP(op, Op, {
NDArray ufeat = (vec_ufeat.size() == 0) ?
NullArray() : vec_ufeat[src_id];
NDArray efeat = (vec_efeat.size() == 0) ?
NullArray() : vec_efeat[etype];
cuda::SpMMCmpCsrHetero<IdType, DType, Op, cuda::reduce::Min<IdType, DType> >(
bcast, csr, ufeat, efeat, (*vec_out)[dst_id], (*out_aux)[0][dst_id],
(*out_aux)[1][dst_id], (*out_aux)[2][dst_id], (*out_aux)[3][dst_id],
src_id, etype);
});
} else {
LOG(FATAL) << "Not implemented";
}
......
......@@ -160,17 +160,11 @@ __global__ void SpMMCsrKernel(
DType out = BinaryOp::Call(uoff + lhs_add, eoff + rhs_add);
ReduceOp::Call(&local_accum, &local_argu, &local_arge, out, cid, eid);
}
// TODO(isratnisa, BarclayII)
// The use of += is a quick hack to compute for cross-type reducing
// The use of += is to compute cross-type reducing on heterogeneous graph
// when reduce op is `sum`.
// C = SpMM(SpA, B) + C
// To make it work on max-reducer and min-reducer, i.e.
// C = Max(SpMM<BinaryOp, Max>(SpA, B), C)
// it requires at least the following:
// 1. Initialize the output buffer with ReducerOp::zero.
// 2. Record also which edge type has the maximum/minimum in argmax/argmin.
// This requires non-trivial changes in SpMMCsrKernel itself or writing a new kernel.
// So we leave it to future PRs.
// Separate kernel `SpMMCmpCsrHeteroKernel` is used for max- and min-reducer. It
// does not affect the output on homogeneous graph as `out` is initialized to zero.
out[ty * out_len + tx] += local_accum;
if (ReduceOp::require_arg && BinaryOp::use_lhs)
arg_u[ty * out_len + tx] = local_argu;
......@@ -182,6 +176,67 @@ __global__ void SpMMCsrKernel(
}
}
/*!
* \brief CUDA kernel of SpMM-Min/Max on Csr format.
* \note it uses node parallel strategy, different threadblocks (on y-axis)
* is responsible for the computation on different destination nodes.
* Threadblocks on the x-axis are responsible for the computation on
* different positions in feature dimension.
*/
template <typename Idx, typename DType,
typename BinaryOp, typename ReduceOp,
bool UseBcast = false, bool UseIdx = false>
__global__ void SpMMCmpCsrHeteroKernel(
const DType* __restrict__ ufeat,
const DType* __restrict__ efeat,
DType* __restrict__ out,
Idx* __restrict__ arg_u, Idx* __restrict__ arg_e,
Idx* __restrict__ arg_u_ntype, Idx* __restrict__ arg_e_etype,
const Idx* __restrict__ indptr,
const Idx* __restrict__ indices,
const Idx* __restrict__ edge_map,
int64_t num_rows, int64_t num_cols,
const int64_t* __restrict__ ubcast_off,
const int64_t* __restrict__ ebcast_off,
int64_t ufeat_len, int64_t efeat_len, int64_t out_len,
const int src_type, const int etype) {
// SPMM with CSR.
int ty = blockIdx.y * blockDim.y + threadIdx.y;
const Idx stride_y = blockDim.y * gridDim.y;
const int stride_x = blockDim.x * gridDim.x;
while (ty < num_rows) {
int tx = blockIdx.x * blockDim.x + threadIdx.x;
while (tx < out_len) {
DType new_out = out[ty * out_len + tx];//ReduceOp::zero();
Idx local_argu = 0, local_arge = 0;
const int lhs_add = UseBcast ? ubcast_off[tx] : tx;
const int rhs_add = UseBcast ? ebcast_off[tx] : tx;
for (Idx i = indptr[ty]; i < indptr[ty + 1]; ++i) {
const Idx eid = UseIdx ? _ldg(edge_map + i) : i;
const Idx cid = _ldg(indices + i);
const DType* uoff = BinaryOp::use_lhs ? (ufeat + cid * ufeat_len): nullptr;
const DType* eoff = BinaryOp::use_rhs ? (efeat + eid * efeat_len): nullptr;
DType tmp_out = BinaryOp::Call(uoff + lhs_add, eoff + rhs_add);
ReduceOp::Call(&new_out, &local_argu, &local_arge, tmp_out, cid, eid);
}
// Update output only when max/min values are different that original output
if (out[ty * out_len + tx] != new_out) {
out[ty * out_len + tx] = new_out;
if (ReduceOp::require_arg && BinaryOp::use_lhs) {
arg_u[ty * out_len + tx] = local_argu;
arg_u_ntype[ty * out_len + tx] = src_type;
}
if (ReduceOp::require_arg && BinaryOp::use_rhs) {
arg_e[ty * out_len + tx] = local_arge;
arg_e_etype[ty * out_len + tx] = etype;
}
}
tx += stride_x;
}
ty += stride_y;
}
}
/*!
* \brief CUDA implementation of g-SpMM on Coo format.
* \param bcast Broadcast information.
......@@ -316,6 +371,73 @@ void SpMMCsr(
});
}
/*!
* \brief CUDA kernel of SpMM-Min/Max on Csr format on heterogeneous graph.
* \param bcast Broadcast information.
* \param csr The Csr matrix.
* \param ufeat The feature on source nodes.
* \param efeat The feature on edges.
* \param out The result feature on destination nodes.
* \param argu Arg-Min/Max on source nodes, which refers the source node indices
* correspond to the minimum/maximum values of reduction result on
* destination nodes. It's useful in computing gradients of Min/Max reducer.
* \param arge Arg-Min/Max on edges. which refers the source node indices
* correspond to the minimum/maximum values of reduction result on
* destination nodes. It's useful in computing gradients of Min/Max reducer.
* \param argu_ntype Node type of the arg-Min/Max on source nodes, which refers the
* source node types correspond to the minimum/maximum values of reduction result
* on destination nodes. It's useful in computing gradients of Min/Max reducer.
* \param arge_etype Edge-type of the arg-Min/Max on edges. which refers the source
* node indices correspond to the minimum/maximum values of reduction result on
* destination nodes. It's useful in computing gradients of Min/Max reducer.
* \param src_type Node type of the source nodes of an etype
* \param etype Edge type
*/
template <typename Idx, typename DType,
typename BinaryOp, typename ReduceOp>
void SpMMCmpCsrHetero(
const BcastOff& bcast,
const CSRMatrix& csr,
NDArray ufeat, NDArray efeat,
NDArray out, NDArray argu, NDArray arge,
NDArray argu_ntype, NDArray arge_etype,
const int src_type, const int etype) {
const Idx *indptr = csr.indptr.Ptr<Idx>();
const Idx *indices = csr.indices.Ptr<Idx>();
const Idx *edge_map = csr.data.Ptr<Idx>();
const DType *ufeat_data = ufeat.Ptr<DType>();
const DType *efeat_data = efeat.Ptr<DType>();
DType *out_data = out.Ptr<DType>();
Idx* argu_data = argu.Ptr<Idx>();
Idx* arge_data = arge.Ptr<Idx>();
auto* thr_entry = runtime::CUDAThreadEntry::ThreadLocal();
int64_t *ubcast_off = nullptr, *ebcast_off = nullptr;
int64_t len = bcast.out_len,
lhs_len = bcast.lhs_len,
rhs_len = bcast.rhs_len;
const int ntx = FindNumThreads(len);
const int nty = CUDA_MAX_NUM_THREADS / ntx;
const int nbx = (len + ntx - 1) / ntx;
const int nby = FindNumBlocks<'y'>((csr.num_rows + nty - 1) / nty);
const dim3 nblks(nbx, nby);
const dim3 nthrs(ntx, nty);
const bool use_idx = !IsNullArray(csr.data);
BCAST_IDX_CTX_SWITCH(bcast, use_idx, ufeat->ctx, ubcast_off, ebcast_off, {
CUDA_KERNEL_CALL((SpMMCmpCsrHeteroKernel<Idx, DType, BinaryOp, ReduceOp, UseBcast, UseIdx>),
nblks, nthrs, 0, thr_entry->stream,
ufeat_data, efeat_data, out_data, argu_data, arge_data,
static_cast<Idx*>(argu_ntype->data),
static_cast<Idx*>(arge_etype->data),
indptr, indices, edge_map,
csr.num_rows, csr.num_cols,
ubcast_off, ebcast_off,
lhs_len, rhs_len, len, src_type, etype)
});
}
} // namespace cuda
} // namespace aten
......
......@@ -13,11 +13,9 @@ import test_utils
from test_utils import parametrize_dtype, get_cases
from scipy.sparse import rand
rfuncs = {'sum': fn.sum, 'max': fn.max, 'min': fn.min, 'mean': fn.mean}
fill_value = {'sum': 0, 'max': float("-inf")}
feat_size = 2
@unittest.skipIf(dgl.backend.backend_name != 'pytorch', reason='Only support PyTorch for now')
@unittest.skipIf(F._default_context_str == 'gpu', reason="Max/min reducer not supported on GPU yet.")
def create_test_heterograph(idtype):
# test heterograph from the docstring, plus a user -- wishes -- game relation
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment