"docs/git@developer.sourcefind.cn:OpenDAS/vision.git" did not exist on "21790df9a4f77bd9ec4db44de04594cb539457a7"
Unverified Commit f673fc25 authored by nv-dlasalle's avatar nv-dlasalle Committed by GitHub
Browse files

[Dataloading] Add class for copying tensors to/from the GPU on a non-default stream (#2284)

* Add async transferer class

* Add async ndarray copy interface

* Add python bindings

* Fix comment

* Add python class

* Fix linting issues

* Add python unit test

* Update python interface

* move async_transferer to cuda only directory

* Fix linting issue

* Move out of contrib

* Add doc strings

* Move test compute from backend

* Update comment

* Fix test naming

* Fix argument usage

* Wrap/unwrap backend parameters

* Move to dataloading

* Move to 'dataloading'

* Make GPU/CPU compatible

* Fix unit tests

* Add docs

* Use only backend interface for datamovement in unit test
parent 269983db
...@@ -115,6 +115,7 @@ file(GLOB DGL_SRC ...@@ -115,6 +115,7 @@ file(GLOB DGL_SRC
src/runtime/*.cc src/runtime/*.cc
src/geometry/*.cc src/geometry/*.cc
src/geometry/cpu/*.cc src/geometry/cpu/*.cc
src/dataloading/*.cc
) )
file(GLOB_RECURSE DGL_SRC_1 file(GLOB_RECURSE DGL_SRC_1
......
...@@ -48,3 +48,17 @@ to generate negative edges. ...@@ -48,3 +48,17 @@ to generate negative edges.
.. autoclass:: Uniform .. autoclass:: Uniform
:members: __call__ :members: __call__
Async Copying to/from GPUs
--------------------------
.. currentmodule:: dgl.dataloading
Data can be copied from the CPU to the GPU, or from the GPU to the CPU,
while the GPU is being used for
computation, using the :class:`AsyncTransferer`.
.. autoclass:: AsyncTransferer
:members: __init__, async_copy
.. autoclass:: async_transferer.Transfer
:members: wait
...@@ -139,11 +139,15 @@ class NDArray { ...@@ -139,11 +139,15 @@ class NDArray {
/*! /*!
* \brief Copy data content from another array. * \brief Copy data content from another array.
* \param other The source array to be copied from. * \param other The source array to be copied from.
* \param stream The stream to perform the copy on if it involves a GPU
* context, otherwise this parameter is ignored.
* \note The copy may happen asynchrously if it involves a GPU context. * \note The copy may happen asynchrously if it involves a GPU context.
* DGLSynchronize is necessary. * DGLSynchronize is necessary.
*/ */
inline void CopyFrom(DLTensor* other); inline void CopyFrom(DLTensor* other,
inline void CopyFrom(const NDArray& other); DGLStreamHandle stream = nullptr);
inline void CopyFrom(const NDArray& other,
DGLStreamHandle stream = nullptr);
/*! /*!
* \brief Copy data content into another array. * \brief Copy data content into another array.
* \param other The source array to be copied from. * \param other The source array to be copied from.
...@@ -384,15 +388,17 @@ inline void NDArray::reset() { ...@@ -384,15 +388,17 @@ inline void NDArray::reset() {
} }
} }
inline void NDArray::CopyFrom(DLTensor* other) { inline void NDArray::CopyFrom(DLTensor* other,
DGLStreamHandle stream) {
CHECK(data_ != nullptr); CHECK(data_ != nullptr);
CopyFromTo(other, &(data_->dl_tensor)); CopyFromTo(other, &(data_->dl_tensor), stream);
} }
inline void NDArray::CopyFrom(const NDArray& other) { inline void NDArray::CopyFrom(const NDArray& other,
DGLStreamHandle stream) {
CHECK(data_ != nullptr); CHECK(data_ != nullptr);
CHECK(other.data_ != nullptr); CHECK(other.data_ != nullptr);
CopyFromTo(&(other.data_->dl_tensor), &(data_->dl_tensor)); CopyFromTo(&(other.data_->dl_tensor), &(data_->dl_tensor), stream);
} }
inline void NDArray::CopyTo(DLTensor* other) const { inline void NDArray::CopyTo(DLTensor* other) const {
......
...@@ -18,6 +18,7 @@ from .neighbor import * ...@@ -18,6 +18,7 @@ from .neighbor import *
from .dataloader import * from .dataloader import *
from . import negative_sampler from . import negative_sampler
from .async_transferer import AsyncTransferer
from .. import backend as F from .. import backend as F
......
""" API for transferring data to/from the GPU over second stream.A """
from .. import backend as F
from .. import ndarray
from .. import utils
from .._ffi.function import _init_api
class Transfer(object):
""" Class for representing an asynchronous transfer. """
def __init__(self, transfer_id, handle):
""" Create a new Transfer object.
Parameters
----------
transfer_id : int
The id of the asynchronous tranfer.
handle : DGLAsyncTransferer
The handle of the DGLAsyncTransferer object that initiated the
transfer.
"""
self._transfer_id = transfer_id
self._handle = handle
def wait(self):
""" Wait for this transfer to finish, and return the result.
Returns
-------
Tensor
The new tensor on the target context.
"""
res_tensor = _CAPI_DGLAsyncTransfererWait(self._handle, self._transfer_id)
return F.zerocopy_from_dgl_ndarray(res_tensor)
class AsyncTransferer(object):
""" Class for initiating asynchronous copies to/from the GPU on a second
GPU stream. """
def __init__(self, device):
""" Create a new AsyncTransferer object.
Parameters
----------
device : Device or context object.
The context in which the second stream will be created. Must be a
GPU context for the copy to be asynchronous.
"""
if isinstance(device, ndarray.DGLContext):
ctx = device
else:
ctx = utils.to_dgl_context(device)
self._handle = _CAPI_DGLAsyncTransfererCreate(ctx)
def async_copy(self, tensor, device):
""" Initiate an asynchronous copy on the internal stream.
Parameters
----------
tensor : Tensor
The tensor to transfer.
device : Device or context object.
The context to transfer to.
Returns
-------
Transfer
A Transfer object that can be waited on to get the tensor in the
new context.
"""
if isinstance(device, ndarray.DGLContext):
ctx = device
else:
ctx = utils.to_dgl_context(device)
tensor = F.zerocopy_to_dgl_ndarray(tensor)
transfer_id = _CAPI_DGLAsyncTransfererStartTransfer(self._handle, tensor, ctx)
return Transfer(transfer_id, self._handle)
_init_api("dgl.dataloading.async_transferer")
/*!
* Copyright (c) 2020 by Contributors
* \file array/async_transferer.cc
* \brief The AsyncTransferer implementation.
*/
#include "async_transferer.h"
#include <dgl/packed_func_ext.h>
#include <dgl/runtime/registry.h>
#include <dgl/runtime/device_api.h>
#include <vector>
#include <utility>
#ifdef DGL_USE_CUDA
#include <cuda_runtime.h>
#include "../runtime/cuda/cuda_common.h"
#endif
namespace dgl {
using namespace runtime;
namespace dataloading {
using TransferId = AsyncTransferer::TransferId;
struct AsyncTransferer::Event {
#ifdef DGL_USE_CUDA
cudaEvent_t id;
~Event() {
CUDA_CALL(cudaEventDestroy(id));
}
#endif
};
AsyncTransferer::AsyncTransferer(
DGLContext ctx) :
ctx_(ctx),
next_id_(0),
transfers_(),
stream_(nullptr) {
if (ctx_.device_type == kDLGPU) {
stream_ = DeviceAPI::Get(ctx_)->CreateStream(ctx_);
}
}
AsyncTransferer::~AsyncTransferer() {
if (stream_) {
DeviceAPI::Get(ctx_)->FreeStream(ctx_, stream_);
}
}
TransferId AsyncTransferer::StartTransfer(
NDArray src,
DGLContext dst_ctx) {
const TransferId id = GenerateId();
Transfer t;
t.src = src;
DLDataType dtype = src->dtype;
std::vector<int64_t> shape(src->shape, src->shape+src->ndim);
t.dst = NDArray::Empty(shape, dtype, dst_ctx);
if (stream_) {
#ifdef DGL_USE_CUDA
// get tensor information
t.event.reset(new Event);
CUDA_CALL(cudaEventCreate(&t.event->id));
t.dst.CopyFrom(t.src, stream_);
CUDA_CALL(cudaEventRecord(t.event->id, static_cast<cudaStream_t>(stream_)));
#else
LOG(FATAL) << "GPU support not compiled.";
#endif
} else {
// copy synchronously since we don't have the notion of streams on the CPU
t.event.reset(nullptr);
t.dst.CopyFrom(t.src);
}
transfers_.emplace(id, std::move(t));
return id;
}
NDArray AsyncTransferer::Wait(
const TransferId id) {
auto iter = transfers_.find(id);
CHECK(iter != transfers_.end()) << "Unknown transfer: " << id;
Transfer t = std::move(iter->second);
transfers_.erase(iter);
if (t.event) {
#ifdef DGL_USE_CUDA
// wait for it
CUDA_CALL(cudaEventSynchronize(t.event->id));
#endif
}
return t.dst;
}
TransferId AsyncTransferer::GenerateId() {
return ++next_id_;
}
DGL_REGISTER_GLOBAL("dataloading.async_transferer._CAPI_DGLAsyncTransfererCreate")
.set_body([] (DGLArgs args, DGLRetValue* rv) {
DGLContext ctx = args[0];
*rv = AsyncTransfererRef(std::make_shared<AsyncTransferer>(ctx));
});
DGL_REGISTER_GLOBAL("dataloading.async_transferer._CAPI_DGLAsyncTransfererStartTransfer")
.set_body([] (DGLArgs args, DGLRetValue* rv) {
AsyncTransfererRef ref = args[0];
NDArray array = args[1];
DGLContext ctx = args[2];
int id = ref->StartTransfer(array, ctx);
*rv = id;
});
DGL_REGISTER_GLOBAL("dataloading.async_transferer._CAPI_DGLAsyncTransfererWait")
.set_body([] (DGLArgs args, DGLRetValue* rv) {
AsyncTransfererRef ref = args[0];
int id = args[1];
NDArray arr = ref->Wait(id);
*rv = arr;
});
} // namespace dataloading
} // namespace dgl
/*!
* Copyright (c) 2020 by Contributors
* \file array/async_transferer.h
* \brief The AsyncTransferer class for copying the data to/from the GPU on a
* separate stream.
*/
#ifndef DGL_DATALOADING_ASYNC_TRANSFERER_H_
#define DGL_DATALOADING_ASYNC_TRANSFERER_H_
#include <dgl/runtime/c_runtime_api.h>
#include <dgl/runtime/ndarray.h>
#include <dgl/runtime/object.h>
#include <unordered_map>
#include <memory>
namespace dgl {
namespace dataloading {
class AsyncTransferer : public runtime::Object {
public:
using TransferId = int;
explicit AsyncTransferer(
DGLContext ctx);
~AsyncTransferer();
// disable copying
AsyncTransferer(
const AsyncTransferer&) = delete;
AsyncTransferer& operator=(
const AsyncTransferer&) = delete;
TransferId StartTransfer(
runtime::NDArray data,
DGLContext ctx);
runtime::NDArray Wait(
TransferId id);
static constexpr const char* _type_key = "ndarray.AsyncTransferer";
DGL_DECLARE_OBJECT_TYPE_INFO(AsyncTransferer, Object);
private:
struct Event;
struct Transfer {
std::unique_ptr<Event> event;
bool recorded;
runtime::NDArray src;
runtime::NDArray dst;
};
DGLContext ctx_;
TransferId next_id_;
std::unordered_map<TransferId, Transfer> transfers_;
DGLStreamHandle stream_;
TransferId GenerateId();
};
DGL_DEFINE_OBJECT_REF(AsyncTransfererRef, AsyncTransferer);
} // namespace dataloading
} // namespace dgl
#endif // DGL_DATALOADING_ASYNC_TRANSFERER_H_
import dgl
import unittest
import backend as F
from dgl.dataloading import AsyncTransferer
def test_async_transferer_to_other():
cpu_ones = F.ones([100,75,25], dtype=F.int32, ctx=F.cpu())
tran = AsyncTransferer(F.ctx())
t = tran.async_copy(cpu_ones, F.ctx())
other_ones = t.wait()
assert F.context(other_ones) == F.ctx()
assert F.array_equal(F.copy_to(other_ones, ctx=F.cpu()), cpu_ones)
def test_async_transferer_from_other():
other_ones = F.ones([100,75,25], dtype=F.int32, ctx=F.ctx())
tran = AsyncTransferer(F.ctx())
t = tran.async_copy(other_ones, F.cpu())
cpu_ones = t.wait()
assert F.context(cpu_ones) == F.cpu()
assert F.array_equal(F.copy_to(other_ones, ctx=F.cpu()), cpu_ones)
if __name__ == '__main__':
test_async_transferer_to_other()
test_async_transferer_from_other()
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment