cuda_device_api.cc 13.2 KB
Newer Older
1
/**
2
 *  Copyright (c) 2017-2022 by Contributors
3
4
 * @file cuda_device_api.cc
 * @brief GPU specific API
5
 */
6
#include <cuda_runtime.h>
7
#include <dgl/runtime/device_api.h>
8
#include <dgl/runtime/registry.h>
9
#include <dgl/runtime/tensordispatch.h>
10
#include <dmlc/thread_local.h>
11

12
13
14
15
16
17
18
#include "cuda_common.h"

namespace dgl {
namespace runtime {

class CUDADeviceAPI final : public DeviceAPI {
 public:
19
20
21
22
23
24
25
26
27
28
29
30
31
  CUDADeviceAPI() {
    int count;
    auto err = cudaGetDeviceCount(&count);
    switch (err) {
      case cudaSuccess:
        break;
      default:
        count = 0;
        cudaGetLastError();
    }
    is_available_ = count > 0;
  }

32
  bool IsAvailable() final { return is_available_; }
33

34
35
36
37
38
39
40
  void SetDevice(DGLContext ctx) final {
    CUDA_CALL(cudaSetDevice(ctx.device_id));
  }
  void GetAttr(DGLContext ctx, DeviceAttrKind kind, DGLRetValue* rv) final {
    int value = 0;
    switch (kind) {
      case kExist:
41
42
43
44
        value =
            (cudaDeviceGetAttribute(
                 &value, cudaDevAttrMaxThreadsPerBlock, ctx.device_id) ==
             cudaSuccess);
45
46
47
48
49
50
51
        break;
      case kMaxThreadsPerBlock: {
        CUDA_CALL(cudaDeviceGetAttribute(
            &value, cudaDevAttrMaxThreadsPerBlock, ctx.device_id));
        break;
      }
      case kWarpSize: {
52
53
        CUDA_CALL(
            cudaDeviceGetAttribute(&value, cudaDevAttrWarpSize, ctx.device_id));
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
        break;
      }
      case kMaxSharedMemoryPerBlock: {
        CUDA_CALL(cudaDeviceGetAttribute(
            &value, cudaDevAttrMaxSharedMemoryPerBlock, ctx.device_id));
        break;
      }
      case kComputeVersion: {
        std::ostringstream os;
        CUDA_CALL(cudaDeviceGetAttribute(
            &value, cudaDevAttrComputeCapabilityMajor, ctx.device_id));
        os << value << ".";
        CUDA_CALL(cudaDeviceGetAttribute(
            &value, cudaDevAttrComputeCapabilityMinor, ctx.device_id));
        os << value;
        *rv = os.str();
        return;
      }
      case kDeviceName: {
        cudaDeviceProp props;
        CUDA_CALL(cudaGetDeviceProperties(&props, ctx.device_id));
        *rv = std::string(props.name);
        return;
      }
      case kMaxClockRate: {
        CUDA_CALL(cudaDeviceGetAttribute(
            &value, cudaDevAttrClockRate, ctx.device_id));
        break;
      }
      case kMultiProcessorCount: {
        CUDA_CALL(cudaDeviceGetAttribute(
            &value, cudaDevAttrMultiProcessorCount, ctx.device_id));
        break;
      }
      case kMaxThreadDimensions: {
        int dims[3];
        CUDA_CALL(cudaDeviceGetAttribute(
            &dims[0], cudaDevAttrMaxBlockDimX, ctx.device_id));
        CUDA_CALL(cudaDeviceGetAttribute(
            &dims[1], cudaDevAttrMaxBlockDimY, ctx.device_id));
        CUDA_CALL(cudaDeviceGetAttribute(
            &dims[2], cudaDevAttrMaxBlockDimZ, ctx.device_id));

        std::stringstream ss;  // use json string to return multiple int values;
98
        ss << "[" << dims[0] << ", " << dims[1] << ", " << dims[2] << "]";
99
100
101
102
103
104
        *rv = ss.str();
        return;
      }
    }
    *rv = value;
  }
105
106
107
  void* AllocDataSpace(
      DGLContext ctx, size_t nbytes, size_t alignment,
      DGLDataType type_hint) final {
108
109
    SetDevice(ctx);
    // Redirect to PyTorch's allocator when available.
110
111
112
113
114
    TensorDispatcher* tensor_dispatcher = TensorDispatcher::Global();
    if (tensor_dispatcher->IsAvailable()) {
      return tensor_dispatcher->CUDAAllocWorkspace(
          nbytes, getCurrentCUDAStream());
    }
115
116
    CHECK_EQ(256 % alignment, 0U) << "CUDA space is aligned at 256 bytes";
    void* ret;
117
118
119
120
121
    CUDA_CALL(cudaMalloc(&ret, nbytes));
    return ret;
  }

  void FreeDataSpace(DGLContext ctx, void* ptr) final {
122
    SetDevice(ctx);
123
124
125
126
    TensorDispatcher* tensor_dispatcher = TensorDispatcher::Global();
    if (tensor_dispatcher->IsAvailable()) {
      return tensor_dispatcher->CUDAFreeWorkspace(ptr);
    }
127
128
129
    CUDA_CALL(cudaFree(ptr));
  }

130
131
132
133
  void CopyDataFromTo(
      const void* from, size_t from_offset, void* to, size_t to_offset,
      size_t size, DGLContext ctx_from, DGLContext ctx_to,
      DGLDataType type_hint, DGLStreamHandle stream) {
134
135
136
    cudaStream_t cu_stream = static_cast<cudaStream_t>(stream);
    from = static_cast<const char*>(from) + from_offset;
    to = static_cast<char*>(to) + to_offset;
137
    if (ctx_from.device_type == kDGLCUDA && ctx_to.device_type == kDGLCUDA) {
138
139
140
141
      CUDA_CALL(cudaSetDevice(ctx_from.device_id));
      if (ctx_from.device_id == ctx_to.device_id) {
        GPUCopy(from, to, size, cudaMemcpyDeviceToDevice, cu_stream);
      } else {
142
143
        CUDA_CALL(cudaMemcpyPeerAsync(
            to, ctx_to.device_id, from, ctx_from.device_id, size, cu_stream));
144
      }
145
146
    } else if (
        ctx_from.device_type == kDGLCUDA && ctx_to.device_type == kDGLCPU) {
147
148
      CUDA_CALL(cudaSetDevice(ctx_from.device_id));
      GPUCopy(from, to, size, cudaMemcpyDeviceToHost, cu_stream);
149
150
    } else if (
        ctx_from.device_type == kDGLCPU && ctx_to.device_type == kDGLCUDA) {
151
152
153
154
155
156
157
      CUDA_CALL(cudaSetDevice(ctx_to.device_id));
      GPUCopy(from, to, size, cudaMemcpyHostToDevice, cu_stream);
    } else {
      LOG(FATAL) << "expect copy from/to GPU or between GPU";
    }
  }

158
159
160
161
  void CopyDataFromTo(
      const void* from, size_t from_offset, void* to, size_t to_offset,
      size_t size, DGLContext ctx_from, DGLContext ctx_to,
      DGLDataType type_hint) final {
162
    auto stream = GetStream();
163
164
165
    CopyDataFromTo(
        from, from_offset, to, to_offset, size, ctx_from, ctx_to, type_hint,
        stream);
166
167
  }

168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
  // To ensure correct behavior, `record_event` must be invoked anytime a
  // pointer from PyTorch CachingHostAllocator is used in a cudaMemcpyAsync
  // call. It provides a way to re-use freed pinned (page-locked) memory
  // allocations and avoid device sync due to cudaFreeHost calls.
  void RecordedCopyDataFromTo(
      void* from, size_t from_offset, void* to, size_t to_offset, size_t size,
      DGLContext ctx_from, DGLContext ctx_to, DGLDataType type_hint,
      void* pytorch_ctx) final {
    auto stream = GetStream();
    CopyDataFromTo(
        from, from_offset, to, to_offset, size, ctx_from, ctx_to, type_hint,
        stream);
    auto tensor_dispatcher = TensorDispatcher::Global();
    if (tensor_dispatcher->IsAvailable()) {
      auto custream = static_cast<cudaStream_t>(stream);
      void* ptr = ctx_to.device_type == kDGLCPU ? to : from;
      int id =
          ctx_to.device_type == kDGLCPU ? ctx_from.device_id : ctx_to.device_id;
      tensor_dispatcher->CUDARecordHostAlloc(ptr, pytorch_ctx, custream, id);
    }
  }

190
191
192
  DGLStreamHandle CreateStream(DGLContext ctx) {
    CUDA_CALL(cudaSetDevice(ctx.device_id));
    cudaStream_t retval;
193
194
    // make sure the legacy default stream won't block on this stream
    CUDA_CALL(cudaStreamCreateWithFlags(&retval, cudaStreamNonBlocking));
195
196
197
198
199
200
201
202
203
    return static_cast<DGLStreamHandle>(retval);
  }

  void FreeStream(DGLContext ctx, DGLStreamHandle stream) {
    CUDA_CALL(cudaSetDevice(ctx.device_id));
    cudaStream_t cu_stream = static_cast<cudaStream_t>(stream);
    CUDA_CALL(cudaStreamDestroy(cu_stream));
  }

204
205
  void SyncStreamFromTo(
      DGLContext ctx, DGLStreamHandle event_src, DGLStreamHandle event_dst) {
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
    CUDA_CALL(cudaSetDevice(ctx.device_id));
    cudaStream_t src_stream = static_cast<cudaStream_t>(event_src);
    cudaStream_t dst_stream = static_cast<cudaStream_t>(event_dst);
    cudaEvent_t evt;
    CUDA_CALL(cudaEventCreate(&evt));
    CUDA_CALL(cudaEventRecord(evt, src_stream));
    CUDA_CALL(cudaStreamWaitEvent(dst_stream, evt, 0));
    CUDA_CALL(cudaEventDestroy(evt));
  }

  void StreamSync(DGLContext ctx, DGLStreamHandle stream) final {
    CUDA_CALL(cudaSetDevice(ctx.device_id));
    CUDA_CALL(cudaStreamSynchronize(static_cast<cudaStream_t>(stream)));
  }

221
  /** NOTE: If the backend is PyTorch, we will use PyTorch's stream management,
222
223
224
225
226
227
   *        so just avoid calling our SetStream/CreateStream unless
   *        you really need advanced stream control.
   * TODO(Xin): Redirect this to PyTorch or remove it.
   * PyTorch allows external CUDA streams to be set as current since v1.11.
   */
  void SetStream(DGLContext ctx, DGLStreamHandle stream) final {}
228

229
  DGLStreamHandle GetStream() const final {
230
    return static_cast<DGLStreamHandle>(getCurrentCUDAStream());
231
232
  }

233
  /** NOTE: cudaHostRegister can be called from an arbitrary GPU device,
234
235
236
237
   *        so we don't need to specify a ctx.
   *        The pinned memory can be seen by all CUDA contexts,
   *        not just the one that performed the allocation
   */
238
  bool PinData(void* ptr, size_t nbytes) override {
239
    // prevent users from pinning empty tensors or graphs
240
    if (ptr == nullptr || nbytes == 0) return false;
241
242
243
244
245
246
    TensorDispatcher* tensor_dispatcher = TensorDispatcher::Global();
    // Minimize the pinned memory pool allocated by backend (via tensoradapter)
    // to preserve enough memory for DGL inherited in-place pin-memory operation
    if (tensor_dispatcher->IsAvailable()) {
      tensor_dispatcher->CUDAHostAllocatorEmptyCache();
    }
247
    CUDA_CALL(cudaHostRegister(ptr, nbytes, cudaHostRegisterDefault));
248
    return true;
249
250
  }

251
  void UnpinData(void* ptr) {
252
    if (ptr == nullptr) return;
253
254
255
    CUDA_CALL(cudaHostUnregister(ptr));
  }

256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
  void* AllocPinnedDataSpace(
      size_t nbytes, void** ctx, void** deleter) override {
    // prevent pinning empty tensors or graphs
    if (nbytes == 0) return nullptr;
    TensorDispatcher* tensor_dispatcher = TensorDispatcher::Global();
    CHECK(tensor_dispatcher->IsAvailable())
        << "CachingHostAllocator is not available in the current backend "
           "PyTorch. Please update the PyTorch version to 1.11+";
    return tensor_dispatcher->CUDAAllocHostWorkspace(nbytes, ctx, deleter);
  }

  void FreePinnedDataSpace(void** deleter) override {
    TensorDispatcher* tensor_dispatcher = TensorDispatcher::Global();
    CHECK(tensor_dispatcher->IsAvailable())
        << "CachingHostAllocator is not available in the current backend "
           "PyTorch. Please update the PyTorch version to 1.11+";
    tensor_dispatcher->CUDAFreeHostWorkspace(deleter);
  }

275
276
  bool IsPinned(const void* ptr) override {
    // can't be a pinned tensor if CUDA context is unavailable.
277
    if (!is_available_) return false;
278
279
280
281
282
283

    cudaPointerAttributes attr;
    cudaError_t status = cudaPointerGetAttributes(&attr, ptr);
    bool result = false;

    switch (status) {
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
      case cudaErrorInvalidValue:
        // might be a normal CPU tensor in CUDA 10.2-
        cudaGetLastError();  // clear error
        break;
      case cudaSuccess:
        result = (attr.type == cudaMemoryTypeHost);
        break;
      case cudaErrorInitializationError:
      case cudaErrorNoDevice:
      case cudaErrorInsufficientDriver:
      case cudaErrorInvalidDevice:
        // We don't want to fail in these particular cases since this function
        // can be called when users only want to run on CPU even if CUDA API is
        // enabled, or in a forked subprocess where CUDA context cannot be
        // initialized.  So we just mark the CUDA context to unavailable and
        // return.
        is_available_ = false;
        cudaGetLastError();  // clear error
        break;
      default:
        LOG(FATAL) << "error while determining memory status: "
                   << cudaGetErrorString(status);
        break;
307
308
309
310
311
    }

    return result;
  }

312
313
  void* AllocWorkspace(
      DGLContext ctx, size_t size, DGLDataType type_hint) final {
314
    SetDevice(ctx);
315
    // Redirect to PyTorch's allocator when available.
316
317
318
319
    TensorDispatcher* tensor_dispatcher = TensorDispatcher::Global();
    if (tensor_dispatcher->IsAvailable())
      return tensor_dispatcher->CUDAAllocWorkspace(
          size, getCurrentCUDAStream());
320
321

    return CUDAThreadEntry::ThreadLocal()->pool.AllocWorkspace(ctx, size);
322
323
324
  }

  void FreeWorkspace(DGLContext ctx, void* data) final {
325
    SetDevice(ctx);
326
327
328
    TensorDispatcher* tensor_dispatcher = TensorDispatcher::Global();
    if (tensor_dispatcher->IsAvailable())
      return tensor_dispatcher->CUDAFreeWorkspace(data);
329
330

    CUDAThreadEntry::ThreadLocal()->pool.FreeWorkspace(ctx, data);
331
332
333
334
335
336
337
338
339
  }

  static const std::shared_ptr<CUDADeviceAPI>& Global() {
    static std::shared_ptr<CUDADeviceAPI> inst =
        std::make_shared<CUDADeviceAPI>();
    return inst;
  }

 private:
340
341
342
  static void GPUCopy(
      const void* from, void* to, size_t size, cudaMemcpyKind kind,
      cudaStream_t stream) {
343
344
    CUDA_CALL(cudaMemcpyAsync(to, from, size, kind, stream));
    if (stream == 0 && kind == cudaMemcpyDeviceToHost) {
345
346
      // only wait for the copy, when it's on the default stream, and it's to
      // host memory
347
      CUDA_CALL(cudaStreamSynchronize(stream));
348
349
    }
  }
350
351

  bool is_available_ = true;
352
353
354
355
};

typedef dmlc::ThreadLocalStore<CUDAThreadEntry> CUDAThreadStore;

356
CUDAThreadEntry::CUDAThreadEntry() : pool(kDGLCUDA, CUDADeviceAPI::Global()) {}
357
358
359
360
361

CUDAThreadEntry* CUDAThreadEntry::ThreadLocal() {
  return CUDAThreadStore::Get();
}

362
cudaStream_t getCurrentCUDAStream() {
363
364
365
  TensorDispatcher* tensor_dispatcher = TensorDispatcher::Global();
  if (tensor_dispatcher->IsAvailable())
    return tensor_dispatcher->CUDAGetCurrentStream();
366
367
368
369
  else  // return the default stream when TA is not available
    return nullptr;
}

370
DGL_REGISTER_GLOBAL("device_api.cuda")
371
372
373
374
    .set_body([](DGLArgs args, DGLRetValue* rv) {
      DeviceAPI* ptr = CUDADeviceAPI::Global().get();
      *rv = static_cast<void*>(ptr);
    });
375
376
377

}  // namespace runtime
}  // namespace dgl