"vscode:/vscode.git/clone" did not exist on "ffc58ef43afa0d40da66bd95a4f9e2f97c2153f0"
rowwise_sampling.cu 13.3 KB
Newer Older
1
/**
2
 *  Copyright (c) 2021 by Contributors
3
4
 * @file array/cuda/rowwise_sampling.cu
 * @brief uniform rowwise sampling
5
6
 */

7
#include <curand_kernel.h>
8
9
#include <dgl/random.h>
#include <dgl/runtime/device_api.h>
10
#include <dgl/runtime/tensordispatch.h>
11

12
#include <cub/cub.cuh>
13
14
#include <numeric>

15
#include "../../array/cuda/atomic.cuh"
16
#include "../../runtime/cuda/cuda_common.h"
17
#include "./utils.h"
18

19
20
21
22
using namespace dgl::cuda;
using namespace dgl::aten::cuda;
using TensorDispatcher = dgl::runtime::TensorDispatcher;

23
24
25
26
27
28
namespace dgl {
namespace aten {
namespace impl {

namespace {

29
constexpr int BLOCK_SIZE = 128;
30
31

/**
32
33
34
35
36
37
38
39
40
41
42
 * @brief Compute the size of each row in the sampled CSR, without replacement.
 *
 * @tparam IdType The type of node and edge indexes.
 * @param num_picks The number of non-zero entries to pick per row.
 * @param num_rows The number of rows to pick.
 * @param in_rows The set of rows to pick.
 * @param in_ptr The index where each row's edges start.
 * @param out_deg The size of each row in the sampled matrix, as indexed by
 * `in_rows` (output).
 */
template <typename IdType>
43
__global__ void _CSRRowWiseSampleDegreeKernel(
44
45
46
    const int64_t num_picks, const int64_t num_rows,
    const IdType* const in_rows, const IdType* const in_ptr,
    IdType* const out_deg) {
47
  const int tIdx = threadIdx.x + blockIdx.x * blockDim.x;
48
49
50
51

  if (tIdx < num_rows) {
    const int in_row = in_rows[tIdx];
    const int out_row = tIdx;
52
53
    out_deg[out_row] = min(
        static_cast<IdType>(num_picks), in_ptr[in_row + 1] - in_ptr[in_row]);
54

55
    if (out_row == num_rows - 1) {
56
57
58
59
60
61
62
      // make the prefixsum work
      out_deg[num_rows] = 0;
    }
  }
}

/**
63
64
65
66
67
68
69
70
71
72
73
 * @brief Compute the size of each row in the sampled CSR, with replacement.
 *
 * @tparam IdType The type of node and edge indexes.
 * @param num_picks The number of non-zero entries to pick per row.
 * @param num_rows The number of rows to pick.
 * @param in_rows The set of rows to pick.
 * @param in_ptr The index where each row's edges start.
 * @param out_deg The size of each row in the sampled matrix, as indexed by
 * `in_rows` (output).
 */
template <typename IdType>
74
__global__ void _CSRRowWiseSampleDegreeReplaceKernel(
75
76
77
    const int64_t num_picks, const int64_t num_rows,
    const IdType* const in_rows, const IdType* const in_ptr,
    IdType* const out_deg) {
78
  const int tIdx = threadIdx.x + blockIdx.x * blockDim.x;
79
80
81
82
83

  if (tIdx < num_rows) {
    const int64_t in_row = in_rows[tIdx];
    const int64_t out_row = tIdx;

84
    if (in_ptr[in_row + 1] - in_ptr[in_row] == 0) {
85
86
87
88
89
      out_deg[out_row] = 0;
    } else {
      out_deg[out_row] = static_cast<IdType>(num_picks);
    }

90
    if (out_row == num_rows - 1) {
91
92
93
94
95
96
97
      // make the prefixsum work
      out_deg[num_rows] = 0;
    }
  }
}

/**
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
 * @brief Perform row-wise uniform sampling on a CSR matrix,
 * and generate a COO matrix, without replacement.
 *
 * @tparam IdType The ID type used for matrices.
 * @tparam TILE_SIZE The number of rows covered by each threadblock.
 * @param rand_seed The random seed to use.
 * @param num_picks The number of non-zeros to pick per row.
 * @param num_rows The number of rows to pick.
 * @param in_rows The set of rows to pick.
 * @param in_ptr The indptr array of the input CSR.
 * @param in_index The indices array of the input CSR.
 * @param data The data array of the input CSR.
 * @param out_ptr The offset to write each row to in the output COO.
 * @param out_rows The rows of the output COO (output).
 * @param out_cols The columns of the output COO (output).
 * @param out_idxs The data array of the output COO (output).
 */
template <typename IdType, int TILE_SIZE>
116
__global__ void _CSRRowWiseSampleUniformKernel(
117
118
119
120
121
    const uint64_t rand_seed, const int64_t num_picks, const int64_t num_rows,
    const IdType* const in_rows, const IdType* const in_ptr,
    const IdType* const in_index, const IdType* const data,
    const IdType* const out_ptr, IdType* const out_rows, IdType* const out_cols,
    IdType* const out_idxs) {
122
  // we assign one warp per row
123
  assert(blockDim.x == BLOCK_SIZE);
124

125
  int64_t out_row = blockIdx.x * TILE_SIZE;
126
127
  const int64_t last_row =
      min(static_cast<int64_t>(blockIdx.x + 1) * TILE_SIZE, num_rows);
128

129
  curandStatePhilox4_32_10_t rng;
130
  curand_init(rand_seed * gridDim.x + blockIdx.x, threadIdx.x, 0, &rng);
131

132
  while (out_row < last_row) {
133
134
    const int64_t row = in_rows[out_row];
    const int64_t in_row_start = in_ptr[row];
135
    const int64_t deg = in_ptr[row + 1] - in_row_start;
136
137
138
    const int64_t out_row_start = out_ptr[out_row];

    if (deg <= num_picks) {
139
140
141
142
143
144
      // just copy row when there is not enough nodes to sample.
      for (int idx = threadIdx.x; idx < deg; idx += BLOCK_SIZE) {
        const IdType in_idx = in_row_start + idx;
        out_rows[out_row_start + idx] = row;
        out_cols[out_row_start + idx] = in_index[in_idx];
        out_idxs[out_row_start + idx] = data ? data[in_idx] : in_idx;
145
146
147
      }
    } else {
      // generate permutation list via reservoir algorithm
148
149
      for (int idx = threadIdx.x; idx < num_picks; idx += BLOCK_SIZE) {
        out_idxs[out_row_start + idx] = idx;
150
      }
151
      __syncthreads();
152

153
154
      for (int idx = num_picks + threadIdx.x; idx < deg; idx += BLOCK_SIZE) {
        const int num = curand(&rng) % (idx + 1);
155
156
157
        if (num < num_picks) {
          // use max so as to achieve the replacement order the serial
          // algorithm would have
158
          AtomicMax(out_idxs + out_row_start + num, idx);
159
160
        }
      }
161
      __syncthreads();
162
163

      // copy permutation over
164
165
166
167
168
      for (int idx = threadIdx.x; idx < num_picks; idx += BLOCK_SIZE) {
        const IdType perm_idx = out_idxs[out_row_start + idx] + in_row_start;
        out_rows[out_row_start + idx] = row;
        out_cols[out_row_start + idx] = in_index[perm_idx];
        out_idxs[out_row_start + idx] = data ? data[perm_idx] : perm_idx;
169
170
      }
    }
171
    out_row += 1;
172
173
174
175
  }
}

/**
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
 * @brief Perform row-wise uniform sampling on a CSR matrix,
 * and generate a COO matrix, with replacement.
 *
 * @tparam IdType The ID type used for matrices.
 * @tparam TILE_SIZE The number of rows covered by each threadblock.
 * @param rand_seed The random seed to use.
 * @param num_picks The number of non-zeros to pick per row.
 * @param num_rows The number of rows to pick.
 * @param in_rows The set of rows to pick.
 * @param in_ptr The indptr array of the input CSR.
 * @param in_index The indices array of the input CSR.
 * @param data The data array of the input CSR.
 * @param out_ptr The offset to write each row to in the output COO.
 * @param out_rows The rows of the output COO (output).
 * @param out_cols The columns of the output COO (output).
 * @param out_idxs The data array of the output COO (output).
 */
template <typename IdType, int TILE_SIZE>
194
__global__ void _CSRRowWiseSampleUniformReplaceKernel(
195
196
197
198
199
    const uint64_t rand_seed, const int64_t num_picks, const int64_t num_rows,
    const IdType* const in_rows, const IdType* const in_ptr,
    const IdType* const in_index, const IdType* const data,
    const IdType* const out_ptr, IdType* const out_rows, IdType* const out_cols,
    IdType* const out_idxs) {
200
  // we assign one warp per row
201
  assert(blockDim.x == BLOCK_SIZE);
202

203
  int64_t out_row = blockIdx.x * TILE_SIZE;
204
205
  const int64_t last_row =
      min(static_cast<int64_t>(blockIdx.x + 1) * TILE_SIZE, num_rows);
206

207
  curandStatePhilox4_32_10_t rng;
208
  curand_init(rand_seed * gridDim.x + blockIdx.x, threadIdx.x, 0, &rng);
209

210
  while (out_row < last_row) {
211
212
213
    const int64_t row = in_rows[out_row];
    const int64_t in_row_start = in_ptr[row];
    const int64_t out_row_start = out_ptr[out_row];
214
    const int64_t deg = in_ptr[row + 1] - in_row_start;
215

216
217
    if (deg > 0) {
      // each thread then blindly copies in rows only if deg > 0.
218
      for (int idx = threadIdx.x; idx < num_picks; idx += BLOCK_SIZE) {
219
        const int64_t edge = curand(&rng) % deg;
220
        const int64_t out_idx = out_row_start + idx;
221
        out_rows[out_idx] = row;
222
        out_cols[out_idx] = in_index[in_row_start + edge];
223
224
        out_idxs[out_idx] =
            data ? data[in_row_start + edge] : in_row_start + edge;
225
      }
226
    }
227
    out_row += 1;
228
229
230
  }
}

231
}  // namespace
232
233

///////////////////////////// CSR sampling //////////////////////////
234

235
template <DGLDeviceType XPU, typename IdType>
236
237
COOMatrix _CSRRowWiseSamplingUniform(
    CSRMatrix mat, IdArray rows, const int64_t num_picks, const bool replace) {
238
  const auto& ctx = rows->ctx;
239
  auto device = runtime::DeviceAPI::Get(ctx);
240
  cudaStream_t stream = runtime::getCurrentCUDAStream();
241
242

  const int64_t num_rows = rows->shape[0];
243
244
245
246
247
248
249
250
  const IdType* const slice_rows = static_cast<const IdType*>(rows->data);

  IdArray picked_row =
      NewIdArray(num_rows * num_picks, ctx, sizeof(IdType) * 8);
  IdArray picked_col =
      NewIdArray(num_rows * num_picks, ctx, sizeof(IdType) * 8);
  IdArray picked_idx =
      NewIdArray(num_rows * num_picks, ctx, sizeof(IdType) * 8);
251
252
253
254
  IdType* const out_rows = static_cast<IdType*>(picked_row->data);
  IdType* const out_cols = static_cast<IdType*>(picked_col->data);
  IdType* const out_idxs = static_cast<IdType*>(picked_idx->data);

255
256
257
258
259
  const IdType* in_ptr = static_cast<IdType*>(GetDevicePointer(mat.indptr));
  const IdType* in_cols = static_cast<IdType*>(GetDevicePointer(mat.indices));
  const IdType* data = CSRHasData(mat)
                           ? static_cast<IdType*>(GetDevicePointer(mat.data))
                           : nullptr;
260
261

  // compute degree
262
  IdType* out_deg = static_cast<IdType*>(
263
      device->AllocWorkspace(ctx, (num_rows + 1) * sizeof(IdType)));
264
265
  if (replace) {
    const dim3 block(512);
266
267
    const dim3 grid((num_rows + block.x - 1) / block.x);
    CUDA_KERNEL_CALL(
268
269
        _CSRRowWiseSampleDegreeReplaceKernel, grid, block, 0, stream, num_picks,
        num_rows, slice_rows, in_ptr, out_deg);
270
271
  } else {
    const dim3 block(512);
272
273
    const dim3 grid((num_rows + block.x - 1) / block.x);
    CUDA_KERNEL_CALL(
274
275
        _CSRRowWiseSampleDegreeKernel, grid, block, 0, stream, num_picks,
        num_rows, slice_rows, in_ptr, out_deg);
276
277
278
  }

  // fill out_ptr
279
  IdType* out_ptr = static_cast<IdType*>(
280
      device->AllocWorkspace(ctx, (num_rows + 1) * sizeof(IdType)));
281
  size_t prefix_temp_size = 0;
282
283
284
285
286
  CUDA_CALL(cub::DeviceScan::ExclusiveSum(
      nullptr, prefix_temp_size, out_deg, out_ptr, num_rows + 1, stream));
  void* prefix_temp = device->AllocWorkspace(ctx, prefix_temp_size);
  CUDA_CALL(cub::DeviceScan::ExclusiveSum(
      prefix_temp, prefix_temp_size, out_deg, out_ptr, num_rows + 1, stream));
287
288
289
290
291
292
  device->FreeWorkspace(ctx, prefix_temp);
  device->FreeWorkspace(ctx, out_deg);

  cudaEvent_t copyEvent;
  CUDA_CALL(cudaEventCreate(&copyEvent));

293
294
295
296
297
298
299
300
301
302
  NDArray new_len_tensor;
  if (TensorDispatcher::Global()->IsAvailable()) {
    new_len_tensor = NDArray::PinnedEmpty(
        {1}, DGLDataTypeTraits<IdType>::dtype, DGLContext{kDGLCPU, 0});
  } else {
    // use pageable memory, it will unecessarily block but be functional
    new_len_tensor = NDArray::Empty(
        {1}, DGLDataTypeTraits<IdType>::dtype, DGLContext{kDGLCPU, 0});
  }

303
  // copy using the internal current stream
304
305
306
  CUDA_CALL(cudaMemcpyAsync(
      new_len_tensor->data, out_ptr + num_rows, sizeof(IdType),
      cudaMemcpyDeviceToHost, stream));
307
308
309
310
311
  CUDA_CALL(cudaEventRecord(copyEvent, stream));

  const uint64_t random_seed = RandomEngine::ThreadLocal()->RandInt(1000000000);

  // select edges
312
313
314
315
316
317
  // the number of rows each thread block will cover
  constexpr int TILE_SIZE = 128 / BLOCK_SIZE;
  if (replace) {  // with replacement
    const dim3 block(BLOCK_SIZE);
    const dim3 grid((num_rows + TILE_SIZE - 1) / TILE_SIZE);
    CUDA_KERNEL_CALL(
318
319
320
        (_CSRRowWiseSampleUniformReplaceKernel<IdType, TILE_SIZE>), grid, block,
        0, stream, random_seed, num_picks, num_rows, slice_rows, in_ptr,
        in_cols, data, out_ptr, out_rows, out_cols, out_idxs);
321
322
323
324
  } else {  // without replacement
    const dim3 block(BLOCK_SIZE);
    const dim3 grid((num_rows + TILE_SIZE - 1) / TILE_SIZE);
    CUDA_KERNEL_CALL(
325
326
327
        (_CSRRowWiseSampleUniformKernel<IdType, TILE_SIZE>), grid, block, 0,
        stream, random_seed, num_picks, num_rows, slice_rows, in_ptr, in_cols,
        data, out_ptr, out_rows, out_cols, out_idxs);
328
329
330
331
332
333
334
  }
  device->FreeWorkspace(ctx, out_ptr);

  // wait for copying `new_len` to finish
  CUDA_CALL(cudaEventSynchronize(copyEvent));
  CUDA_CALL(cudaEventDestroy(copyEvent));

335
  const IdType new_len = static_cast<const IdType*>(new_len_tensor->data)[0];
336
337
338
339
  picked_row = picked_row.CreateView({new_len}, picked_row->dtype);
  picked_col = picked_col.CreateView({new_len}, picked_col->dtype);
  picked_idx = picked_idx.CreateView({new_len}, picked_idx->dtype);

340
341
  return COOMatrix(
      mat.num_rows, mat.num_cols, picked_row, picked_col, picked_idx);
342
343
}

344
345
346
347
348
349
350
template <DGLDeviceType XPU, typename IdType>
COOMatrix CSRRowWiseSamplingUniform(
    CSRMatrix mat, IdArray rows, const int64_t num_picks, const bool replace) {
  if (num_picks == -1) {
    // Basically this is UnitGraph::InEdges().
    COOMatrix coo = CSRToCOO(CSRSliceRows(mat, rows), false);
    IdArray sliced_rows = IndexSelect(rows, coo.row);
351
352
    return COOMatrix(
        mat.num_rows, mat.num_cols, sliced_rows, coo.col, coo.data);
353
  } else {
354
355
    return _CSRRowWiseSamplingUniform<XPU, IdType>(
        mat, rows, num_picks, replace);
356
357
358
  }
}

359
template COOMatrix CSRRowWiseSamplingUniform<kDGLCUDA, int32_t>(
360
    CSRMatrix, IdArray, int64_t, bool);
361
template COOMatrix CSRRowWiseSamplingUniform<kDGLCUDA, int64_t>(
362
363
364
365
366
    CSRMatrix, IdArray, int64_t, bool);

}  // namespace impl
}  // namespace aten
}  // namespace dgl