Unverified Commit 806def67 authored by nv-dlasalle's avatar nv-dlasalle Committed by GitHub
Browse files

[Cleanup] Remove async_transferer (#4505)



* Remove async_transferer

* remove test

* Remove AsyncTransferer
Co-authored-by: default avatarXin Yao <xiny@nvidia.com>
Co-authored-by: default avatarXin Yao <yaox12@outlook.com>
parent fce96140
......@@ -155,7 +155,6 @@ file(GLOB DGL_SRC
src/runtime/*.cc
src/geometry/*.cc
src/geometry/cpu/*.cc
src/dataloading/*.cc
src/partition/*.cc
)
......
......@@ -20,7 +20,6 @@ from .cluster_gcn import *
from .shadow import *
from . import negative_sampler
from .async_transferer import AsyncTransferer
from .. import backend as F
......
"""API for transferring data to the GPU over second stream."""
from .. import backend as F
from .. import ndarray
from .. import utils
from .._ffi.function import _init_api
class Transfer(object):
""" Class for representing an asynchronous transfer. """
def __init__(self, transfer_id, handle):
""" Create a new Transfer object.
Parameters
----------
transfer_id : int
The id of the asynchronous transfer.
handle : DGLAsyncTransferer
The handle of the DGLAsyncTransferer object that initiated the
transfer.
"""
self._transfer_id = transfer_id
self._handle = handle
def wait(self):
""" Wait for this transfer to finish, and return the result.
Returns
-------
Tensor
The new tensor on the target context.
"""
res_tensor = _CAPI_DGLAsyncTransfererWait(self._handle, self._transfer_id)
return F.zerocopy_from_dgl_ndarray(res_tensor)
class AsyncTransferer(object):
""" Class for initiating asynchronous copies to the GPU on a second
GPU stream.
To initiate a transfer to a GPU:
>>> tensor_cpu = torch.ones(100000).pin_memory()
>>> transferer = dgl.dataloading.AsyncTransferer(torch.device(0))
>>> future = transferer.async_copy(tensor_cpu, torch.device(0))
And then to wait for the transfer to finish and get a copy of the tensor on
the GPU.
>>> tensor_gpu = future.wait()
"""
def __init__(self, device):
""" Create a new AsyncTransferer object.
Parameters
----------
device : Device or context object.
The context in which the second stream will be created. Must be a
GPU context for the copy to be asynchronous.
"""
if isinstance(device, ndarray.DGLContext):
ctx = device
else:
ctx = utils.to_dgl_context(device)
self._handle = _CAPI_DGLAsyncTransfererCreate(ctx)
def async_copy(self, tensor, device):
""" Initiate an asynchronous copy on the internal stream. For this call
to be asynchronous, the context the AsyncTranserer is created with must
be a GPU context, and the input tensor must be in pinned memory.
Currently, only transfers to the GPU are supported.
Parameters
----------
tensor : Tensor
The tensor to transfer.
device : Device or context object.
The context to transfer to.
Returns
-------
Transfer
A Transfer object that can be waited on to get the tensor in the
new context.
"""
if isinstance(device, ndarray.DGLContext):
ctx = device
else:
ctx = utils.to_dgl_context(device)
if ctx.device_type != ndarray.DGLContext.STR2MASK["gpu"]:
raise ValueError("'device' must be a GPU device.")
tensor = F.zerocopy_to_dgl_ndarray(tensor)
transfer_id = _CAPI_DGLAsyncTransfererStartTransfer(self._handle, tensor, ctx)
return Transfer(transfer_id, self._handle)
_init_api("dataloading.async_transferer", "dgl._dataloading.async_transferer")
/*!
* Copyright (c) 2020 by Contributors
* \file array/async_transferer.cc
* \brief The AsyncTransferer implementation.
*/
#include "async_transferer.h"
#include <dgl/packed_func_ext.h>
#include <dgl/runtime/registry.h>
#include <dgl/runtime/device_api.h>
#include <vector>
#include <utility>
#ifdef DGL_USE_CUDA
#include <cuda_runtime.h>
#include "../runtime/cuda/cuda_common.h"
#endif
namespace dgl {
using namespace runtime;
namespace dataloading {
using TransferId = AsyncTransferer::TransferId;
struct AsyncTransferer::Event {
#ifdef DGL_USE_CUDA
cudaEvent_t id;
~Event() {
CUDA_CALL(cudaEventDestroy(id));
}
#endif
};
AsyncTransferer::AsyncTransferer(
DGLContext ctx) :
ctx_(ctx),
next_id_(0),
transfers_(),
stream_(nullptr) {
if (ctx_.device_type == kDLGPU) {
stream_ = DeviceAPI::Get(ctx_)->CreateStream(ctx_);
}
}
AsyncTransferer::~AsyncTransferer() {
if (stream_) {
DeviceAPI::Get(ctx_)->FreeStream(ctx_, stream_);
}
}
TransferId AsyncTransferer::StartTransfer(
NDArray src,
DGLContext dst_ctx) {
const TransferId id = GenerateId();
Transfer t;
t.src = src;
DLDataType dtype = src->dtype;
std::vector<int64_t> shape(src->shape, src->shape+src->ndim);
t.dst = NDArray::Empty(shape, dtype, dst_ctx);
if (stream_) {
#ifdef DGL_USE_CUDA
// get tensor information
t.event.reset(new Event);
CUDA_CALL(cudaEventCreate(&t.event->id));
t.dst.CopyFrom(t.src, stream_);
CUDA_CALL(cudaEventRecord(t.event->id, static_cast<cudaStream_t>(stream_)));
#else
LOG(FATAL) << "GPU support not compiled.";
#endif
} else {
// copy synchronously since we don't have the notion of streams on the CPU
t.event.reset(nullptr);
t.dst.CopyFrom(t.src);
}
transfers_.emplace(id, std::move(t));
return id;
}
NDArray AsyncTransferer::Wait(
const TransferId id) {
auto iter = transfers_.find(id);
CHECK(iter != transfers_.end()) << "Unknown transfer: " << id;
Transfer t = std::move(iter->second);
transfers_.erase(iter);
if (t.event) {
#ifdef DGL_USE_CUDA
// wait for it
CUDA_CALL(cudaEventSynchronize(t.event->id));
#endif
}
return t.dst;
}
TransferId AsyncTransferer::GenerateId() {
return ++next_id_;
}
DGL_REGISTER_GLOBAL("dataloading.async_transferer._CAPI_DGLAsyncTransfererCreate")
.set_body([] (DGLArgs args, DGLRetValue* rv) {
DGLContext ctx = args[0];
*rv = AsyncTransfererRef(std::make_shared<AsyncTransferer>(ctx));
});
DGL_REGISTER_GLOBAL("dataloading.async_transferer._CAPI_DGLAsyncTransfererStartTransfer")
.set_body([] (DGLArgs args, DGLRetValue* rv) {
AsyncTransfererRef ref = args[0];
NDArray array = args[1];
DGLContext ctx = args[2];
int id = ref->StartTransfer(array, ctx);
*rv = id;
});
DGL_REGISTER_GLOBAL("dataloading.async_transferer._CAPI_DGLAsyncTransfererWait")
.set_body([] (DGLArgs args, DGLRetValue* rv) {
AsyncTransfererRef ref = args[0];
int id = args[1];
NDArray arr = ref->Wait(id);
*rv = arr;
});
} // namespace dataloading
} // namespace dgl
/*!
* Copyright (c) 2020 by Contributors
* \file array/async_transferer.h
* \brief The AsyncTransferer class for copying the data to/from the GPU on a
* separate stream.
*/
#ifndef DGL_DATALOADING_ASYNC_TRANSFERER_H_
#define DGL_DATALOADING_ASYNC_TRANSFERER_H_
#include <dgl/runtime/c_runtime_api.h>
#include <dgl/runtime/ndarray.h>
#include <dgl/runtime/object.h>
#include <unordered_map>
#include <memory>
namespace dgl {
namespace dataloading {
class AsyncTransferer : public runtime::Object {
public:
using TransferId = int;
explicit AsyncTransferer(
DGLContext ctx);
~AsyncTransferer();
// disable copying
AsyncTransferer(
const AsyncTransferer&) = delete;
AsyncTransferer& operator=(
const AsyncTransferer&) = delete;
TransferId StartTransfer(
runtime::NDArray data,
DGLContext ctx);
runtime::NDArray Wait(
TransferId id);
static constexpr const char* _type_key = "ndarray.AsyncTransferer";
DGL_DECLARE_OBJECT_TYPE_INFO(AsyncTransferer, Object);
private:
struct Event;
struct Transfer {
std::unique_ptr<Event> event;
bool recorded;
runtime::NDArray src;
runtime::NDArray dst;
};
DGLContext ctx_;
TransferId next_id_;
std::unordered_map<TransferId, Transfer> transfers_;
DGLStreamHandle stream_;
TransferId GenerateId();
};
DGL_DEFINE_OBJECT_REF(AsyncTransfererRef, AsyncTransferer);
} // namespace dataloading
} // namespace dgl
#endif // DGL_DATALOADING_ASYNC_TRANSFERER_H_
import dgl
import unittest
import backend as F
from dgl.dataloading import AsyncTransferer
@unittest.skipIf(F._default_context_str == 'cpu',
reason="CPU transfer not allowed")
def test_async_transferer_to_other():
cpu_ones = F.ones([100,75,25], dtype=F.int32, ctx=F.cpu())
tran = AsyncTransferer(F.ctx())
t = tran.async_copy(cpu_ones, F.ctx())
other_ones = t.wait()
assert F.context(other_ones) == F.ctx()
assert F.array_equal(F.copy_to(other_ones, ctx=F.cpu()), cpu_ones)
def test_async_transferer_from_other():
other_ones = F.ones([100,75,25], dtype=F.int32, ctx=F.ctx())
tran = AsyncTransferer(F.ctx())
try:
t = tran.async_copy(other_ones, F.cpu())
except ValueError:
# correctly threw an error
pass
else:
# should have thrown an error
assert False
if __name__ == '__main__':
test_async_transferer_to_other()
test_async_transferer_from_other()
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment