Unverified Commit 62b5f50a authored by Chang Liu's avatar Chang Liu Committed by GitHub
Browse files

[Feature] Import PyTorch's `pin_memory()` method for DGL graph structure (#5366)

parent 27b008b9
......@@ -64,6 +64,9 @@ struct COOMatrix {
data(darr),
row_sorted(rsorted),
col_sorted(csorted) {
is_pinned = (aten::IsNullArray(row) || row.IsPinned()) &&
(aten::IsNullArray(col) || col.IsPinned()) &&
(aten::IsNullArray(data) || data.IsPinned());
CheckValidity();
}
......@@ -133,6 +136,20 @@ struct COOMatrix {
col_sorted);
}
/** @brief Return a copy of this matrix in pinned (page-locked) memory. */
inline COOMatrix PinMemory() {
if (is_pinned) return *this;
auto new_coo = COOMatrix(
num_rows, num_cols, row.PinMemory(), col.PinMemory(),
aten::IsNullArray(data) ? data : data.PinMemory(), row_sorted,
col_sorted);
CHECK(new_coo.is_pinned)
<< "An internal DGL error has occured while trying to pin a COO "
"matrix. Please file a bug at 'https://github.com/dmlc/dgl/issues' "
"with the above stacktrace.";
return new_coo;
}
/**
* @brief Pin the row, col and data (if not Null) of the matrix.
* @note This is an in-place method. Behavior depends on the current context,
......
......@@ -60,6 +60,9 @@ struct CSRMatrix {
indices(iarr),
data(darr),
sorted(sorted_flag) {
is_pinned = (aten::IsNullArray(indptr) || indptr.IsPinned()) &&
(aten::IsNullArray(indices) || indices.IsPinned()) &&
(aten::IsNullArray(data) || data.IsPinned());
CheckValidity();
}
......@@ -126,6 +129,19 @@ struct CSRMatrix {
aten::IsNullArray(data) ? data : data.CopyTo(ctx), sorted);
}
/** @brief Return a copy of this matrix in pinned (page-locked) memory. */
inline CSRMatrix PinMemory() {
if (is_pinned) return *this;
auto new_csr = CSRMatrix(
num_rows, num_cols, indptr.PinMemory(), indices.PinMemory(),
aten::IsNullArray(data) ? data : data.PinMemory(), sorted);
CHECK(new_csr.is_pinned)
<< "An internal DGL error has occured while trying to pin a CSR "
"matrix. Please file a bug at 'https://github.com/dmlc/dgl/issues' "
"with the above stacktrace.";
return new_csr;
}
/**
* @brief Pin the indptr, indices and data (if not Null) of the matrix.
* @note This is an in-place method. Behavior depends on the current context,
......
......@@ -50,11 +50,13 @@ class DeviceAPI {
* @brief Check whether the device is available.
*/
virtual bool IsAvailable() { return true; }
/**
* @brief Set the environment device id to ctx
* @param ctx The context to be set.
*/
virtual void SetDevice(DGLContext ctx) = 0;
/**
* @brief Get attribute of specified device.
* @param ctx The device context
......@@ -64,6 +66,7 @@ class DeviceAPI {
*/
virtual void GetAttr(
DGLContext ctx, DeviceAttrKind kind, DGLRetValue* rv) = 0;
/**
* @brief Allocate a data space on device.
* @param ctx The device context to perform operation.
......@@ -76,28 +79,51 @@ class DeviceAPI {
virtual void* AllocDataSpace(
DGLContext ctx, size_t nbytes, size_t alignment,
DGLDataType type_hint) = 0;
/**
* @brief Free a data space on device.
* @param ctx The device context to perform operation.
* @param ptr The data space.
*/
virtual void FreeDataSpace(DGLContext ctx, void* ptr) = 0;
/**
* @brief copy data from one place to another
* @param from The source array.
* @param from_offset The byte offeset in the from.
* @param to The target array.
* @param to_offset The byte offset in the to.
* @param num_bytes The size of the memory in bytes
* @param ctx_from The source context
* @param ctx_to The target context
* @param type_hint The type of elements, only neded by certain backends.
* can be useful for cross device endian converison.
* @param num_bytes The size of the memory in bytes.
* @param ctx_from The source context.
* @param ctx_to The target context.
* @param type_hint The type of elements, only needed by certain backends,
* can be useful for cross device endian converison.
*/
virtual void CopyDataFromTo(
const void* from, size_t from_offset, void* to, size_t to_offset,
size_t num_bytes, DGLContext ctx_from, DGLContext ctx_to,
DGLDataType type_hint) = 0;
/**
* @brief copy data between device and CPU while recording the event.
* @param from The source array.
* @param from_offset The byte offeset in the from.
* @param to The target array.
* @param to_offset The byte offset in the to.
* @param num_bytes The size of the memory in bytes.
* @param ctx_from The source context.
* @param ctx_to The target context.
* @param type_hint The type of elements, only needed by certain backends,
* can be useful for cross device endian converison.
* @param pytorch_ctx The context pointer from PyTorch's CachingHostAllocator.
* @note This function only works when PyTorch CachingHostAllocator is
* available.
*/
virtual void RecordedCopyDataFromTo(
void* from, size_t from_offset, void* to, size_t to_offset,
size_t num_bytes, DGLContext ctx_from, DGLContext ctx_to,
DGLDataType type_hint, void* pytorch_ctx) = 0;
/**
* @brief Create a new stream of execution.
*
......@@ -119,16 +145,19 @@ class DeviceAPI {
* @param stream The stream to be sync.
*/
virtual void StreamSync(DGLContext ctx, DGLStreamHandle stream) = 0;
/**
* @brief Set the stream
* @param ctx The context to set stream.
* @param stream The stream to be set.
*/
virtual void SetStream(DGLContext ctx, DGLStreamHandle stream) {}
/**
* @brief Get the stream
*/
virtual DGLStreamHandle GetStream() const { return nullptr; }
/**
* @brief Synchronize 2 streams of execution.
*
......@@ -160,6 +189,27 @@ class DeviceAPI {
*/
DGL_DLL virtual void UnpinData(void* ptr);
/**
* @brief Allocate the pinned memory using PyTorch CachingHostAllocator.
*
* @param nbytes The size to be pinned.
* @param ctx Pointer to the context pointer from PyTorch's
* CachingHostAllocator.
* @param deleter Pointer to the deleter function from PyTorch's
* CachingHostAllocator.
*/
DGL_DLL virtual void* AllocPinnedDataSpace(
size_t nbytes, void** ctx, void** deleter);
/**
* @brief 'Deallocate' the pinned memory from PyTorch CachingHostAllocator.
* @note It avoids unnecessary cudaFreeHost calls and puts the memory
* block into CachingHostAllocator's free list.
* @param deleter Pointer to the deleter function from PyTorch's
* CachingHostAllocator.
*/
DGL_DLL virtual void FreePinnedDataSpace(void** deleter);
/**
* @brief Check whether the memory is in pinned memory.
*/
......@@ -184,6 +234,7 @@ class DeviceAPI {
*/
DGL_DLL virtual void* AllocWorkspace(
DGLContext ctx, size_t nbytes, DGLDataType type_hint = {});
/**
* @brief Free temporal workspace in backend execution.
*
......@@ -201,7 +252,7 @@ class DeviceAPI {
DGL_DLL static DeviceAPI* Get(DGLContext ctx, bool allow_missing = false);
/**
* @brief Get device API based on context.
* @brief Get device API based on device type.
* @param dev_type The device type
* @param allow_missing Whether allow missing
* @return The corresponding device API.
......
......@@ -154,6 +154,7 @@ class NDArray {
else
return static_cast<T*>(operator->()->data);
}
/**
* @brief Copy data content from/into another array.
* @param other The source array to be copied from.
......@@ -171,19 +172,34 @@ class NDArray {
* @return The array under another context.
*/
inline NDArray CopyTo(const DGLContext& ctx) const;
/**
* @brief Return a new array with a copy of the content.
*/
inline NDArray Clone() const;
/**
* @brief Return a copy of the current instance of NDArray in pinned
* (page-locked) memory.
* @note This is an out-of-place method, which utilizes PyTorch's
* CachingHostAllocator for allocating pinned memory and copying data
* from the current NDAarray. As a result, PyTorch is responsible for
* managing the lifecycle of the returned NDArray, including deciding
* when to flush the data for reuse or call cudaFreeHost. The current
* context must be kDGLCPU, otherwise, an error will be thrown.
*/
inline NDArray PinMemory();
/**
* @brief In-place method to pin the current array by calling PinContainer
* on the underlying NDArray:Container.
* @note This is an in-place method. Behavior depends on the current context,
* kDGLCPU: will be pinned;
* IsPinned: directly return;
* kDGLCUDA: invalid, will throw an error.
* @note This is an in-place method that flags the memory as page-locked by
* utilizing cudaHostRegister at the underlying level to pin the current
* instance of NDArray. The current context must be kDGLCPU, otherwise,
* an error will be thrown.
*/
inline void PinMemory_();
/**
* @brief In-place method to unpin the current array by calling UnpinContainer
* on the underlying NDArray:Container.
......@@ -192,26 +208,31 @@ class NDArray {
* others: directly return.
*/
inline void UnpinMemory_();
/**
* @brief Check if the array is pinned.
*/
inline bool IsPinned() const;
/**
* @brief Record streams that are using the underlying tensor.
* @param stream The stream that is using the underlying tensor.
*/
inline void RecordStream(DGLStreamHandle stream) const;
/**
* @brief Load NDArray from stream
* @param stream The input data stream
* @return Whether load is successful
*/
bool Load(dmlc::Stream* stream);
/**
* @brief Save NDArray to stream
* @param stream The output data stream
*/
void Save(dmlc::Stream* stream) const;
/**
* @brief Create a NDArray that shares the data memory with the current one.
* @param shape The shape of the new array.
......@@ -221,27 +242,40 @@ class NDArray {
*/
DGL_DLL NDArray
CreateView(std::vector<int64_t> shape, DGLDataType dtype, int64_t offset = 0);
/**
* @brief Create an empty NDArray.
* @param shape The shape of the new array.
* @param dtype The data type of the new array.
* @param ctx The context of the Array.
* @param ctx The context of the array.
* @return The created Array
*/
DGL_DLL static NDArray Empty(
std::vector<int64_t> shape, DGLDataType dtype, DGLContext ctx);
/**
* @brief Create an empty NDArray in pinned memory.
* @param shape The shape of the new array.
* @param dtype The data type of the new array.
* @param ctx The context of the array.
* @return The created array.
*/
DGL_DLL static NDArray PinnedEmpty(
std::vector<int64_t> shape, DGLDataType dtype, DGLContext ctx);
/**
* @brief Create an empty NDArray with shared memory.
* @param name The name of shared memory.
* @param shape The shape of the new array.
* @param dtype The data type of the new array.
* @param ctx The context of the Array.
* @param ctx The context of the array.
* @param is_create whether to create shared memory.
* @return The created Array
*/
DGL_DLL static NDArray EmptyShared(
const std::string& name, std::vector<int64_t> shape, DGLDataType dtype,
DGLContext ctx, bool is_create);
/**
* @brief Get the size of the array in the number of bytes.
*/
......@@ -288,6 +322,18 @@ class NDArray {
DGL_DLL static void CopyFromTo(
DGLArray* from, DGLArray* to, DGLStreamHandle stream);
/**
* @brief Function to copy data between device and CPU while recording the
* event.
* @param from The source array.
* @param to The target array.
* @param pytorch_ctx The context pointer from PyTorch's CachingHostAllocator.
* @note This function fuses data-copy and event recording to ensure
* CachingHostAllocator works properly.
*/
DGL_DLL static void RecordedCopyFromTo(
DGLArray* from, DGLArray* to, void* pytorch_ctx);
/**
* @brief Function to pin the DGLArray of a Container.
* @param ptr The container to be pinned.
......@@ -428,7 +474,20 @@ struct NDArray::Container {
/** @brief The internal array object */
std::atomic<int> ref_counter_{0};
/** @brief Whether underlying dl_tensor is pinned by DGL. */
bool pinned_by_dgl_{false};
/** @brief Whether underlying dl_tensor is pinned by PyTorch
* (CachingHostAllocator). */
bool pinned_by_pytorch_{false};
/** @brief The PyTorch storage ctx ptr if pinned_by_pytorch_ = True. */
void* pytorch_ctx_{nullptr};
/** @brief Pointer to the corresp. PyTorch deleter if pinned_by_pytorch_ =
* True.
*/
void* pytorch_raw_deleter_{nullptr};
};
// implementations of inline functions
......@@ -455,6 +514,22 @@ inline void NDArray::CopyFrom(DGLArray* other) {
inline void NDArray::CopyFrom(const NDArray& other) {
CHECK(other.data_ != nullptr);
// Copy between two devices
if (data_->dl_tensor.ctx.device_type !=
other.data_->dl_tensor.ctx.device_type) {
CHECK(data_ != nullptr);
auto to_ctx_type = data_->dl_tensor.ctx.device_type;
auto cpu_data = (to_ctx_type == kDGLCPU ? data_ : other.data_);
// Pinned by PyTorch
if (cpu_data->pinned_by_pytorch_) {
// To ensure correct behavior, the event must be recorded after
// cudaMemcpyAsync as long as the memory is pinned by PyTorch.
void* pytorch_ctx = cpu_data->pytorch_ctx_;
RecordedCopyFromTo(
&(other.data_->dl_tensor), &(data_->dl_tensor), pytorch_ctx);
return;
}
}
CopyFrom(&(other.data_->dl_tensor));
}
......@@ -465,23 +540,50 @@ inline void NDArray::CopyTo(DGLArray* other) const {
inline void NDArray::CopyTo(const NDArray& other) const {
CHECK(other.data_ != nullptr);
// copy between two devices
if (data_->dl_tensor.ctx.device_type !=
other.data_->dl_tensor.ctx.device_type) {
CHECK(data_ != nullptr);
auto from_ctx_type = data_->dl_tensor.ctx.device_type;
auto cpu_data = (from_ctx_type == kDGLCPU ? data_ : other.data_);
// pinned by PyTorch
if (cpu_data->pinned_by_pytorch_) {
// To ensure correct behavior, the event must be recorded after
// cudaMemcpyAsync as long as the memory is pinned by PyTorch.
void* pytorch_ctx = cpu_data->pytorch_ctx_;
RecordedCopyFromTo(
&(data_->dl_tensor), &(other.data_->dl_tensor), pytorch_ctx);
return;
}
}
CopyTo(&(other.data_->dl_tensor));
}
inline NDArray NDArray::CopyTo(const DGLContext& ctx) const {
CHECK(data_ != nullptr);
const DGLArray* dptr = operator->();
const DGLArray* array = operator->();
NDArray ret = Empty(
std::vector<int64_t>(dptr->shape, dptr->shape + dptr->ndim), dptr->dtype,
ctx);
std::vector<int64_t>(array->shape, array->shape + array->ndim),
array->dtype, ctx);
this->CopyTo(ret);
return ret;
}
inline NDArray NDArray::Clone() const {
CHECK(data_ != nullptr);
const DGLArray* dptr = operator->();
return this->CopyTo(dptr->ctx);
const DGLArray* array = operator->();
return this->CopyTo(array->ctx);
}
inline NDArray NDArray::PinMemory() {
CHECK(data_ != nullptr);
const DGLArray* array = operator->();
auto ctx = array->ctx;
NDArray ret = PinnedEmpty(
std::vector<int64_t>(array->shape, array->shape + array->ndim),
array->dtype, ctx);
this->CopyTo(ret);
return ret;
}
inline void NDArray::PinMemory_() {
......
......@@ -134,6 +134,70 @@ class TensorDispatcher {
auto entry = entrypoints_[Op::kCUDACurrentStream];
return FUNCCAST(tensoradapter::CUDACurrentStream, entry)();
}
/**
* @brief Allocate a piece of pinned CPU memory via PyTorch
* CachingHostAllocator.
* @note Used in CUDADeviceAPI::AllocPinnedDataSpace().
* @param nbytes The size to be allocated.
* @param ctx Pointer to the PyTorch storage ctx ptr returned from the
* allocator.
* @param deleter Pointer to the delete function ptr returned from the
* allocator.
* @return Raw pointer to the allocated memory.
*/
inline void* CUDAAllocHostWorkspace(
size_t nbytes, void** ctx, void** deleter) {
auto entry = entrypoints_[Op::kCUDARawHostAlloc];
auto alloc_func = FUNCCAST(tensoradapter::CUDARawHostAlloc, entry);
return alloc_func(nbytes, ctx, deleter);
}
/**
* @brief Insert the pinned memory block (allocated via PyTorch
* CachingHostAllocator) back to the free list for future usage.(ref:
* pytorch/pytorch/blob/master/aten/src/ATen/cuda/CachingHostAllocator.cpp).
* @note Used in CUDADeviceAPI::FreePinnedDataSpace().
* @param deleter Pointer to the delete function ptr returned from the
* allocator.
*/
inline void CUDAFreeHostWorkspace(void** deleter) {
auto entry = entrypoints_[Op::kCUDARawHostDelete];
FUNCCAST(tensoradapter::CUDARawHostDelete, entry)(deleter);
}
/**
* @brief Invoke the record_event function call from PyTorch
* CachingHostAllocator.
* @note This function assoicates a CUDA stream (used by a copy kernel) to the
* pinned data. In the free path of this data, which is achieved by
* calling CUDAFreeHostWorkspace, the set of associated streams is then
* consumed to ensure proper functionlity. (ref:
* pytorch/pytorch/blob/master/aten/src/ATen/cuda/CachingHostAllocator.cpp).
* Used in CUDADeviceAPI::RecordedCopyDataFromTo().
*
* @param data Pointer of the tensor to be recorded.
* @param ctx PyTorch storage ctx ptr returned from the allocator.
* @param stream The stream that currently consumes this tensor.
* @param device_id Device of the tensor.
*/
inline void CUDARecordHostAlloc(
void* data, void* ctx, cudaStream_t stream, int device_id) {
auto entry = entrypoints_[Op::kCUDARecordHostAlloc];
auto recorded_alloc = FUNCCAST(tensoradapter::CUDARecordHostAlloc, entry);
recorded_alloc(data, ctx, stream, device_id);
}
/**
* @brief Release cached pinned memory allocations via cudaHostFree.
* @note Used in CUDADeviceAPI::PinData() before pinning any host memory by
* DGL.
*/
inline void CUDAHostAllocatorEmptyCache() {
auto entry = entrypoints_[Op::kCUDAHostAllocatorEmptyCache];
FUNCCAST(tensoradapter::CUDAHostAllocatorEmptyCache, entry)();
}
#endif // DGL_USE_CUDA
/**
......@@ -149,7 +213,7 @@ class TensorDispatcher {
auto entry = entrypoints_[Op::kRecordStream];
FUNCCAST(tensoradapter::RecordStream, entry)
(ptr, static_cast<cudaStream_t>(stream), device_id);
#endif // DGL_USE_CUDA
#endif
}
private:
......@@ -164,9 +228,12 @@ class TensorDispatcher {
* Must match the functions in tensoradapter/include/tensoradapter.h.
*/
static constexpr const char* names_[] = {
"CPURawAlloc", "CPURawDelete",
"CPURawAlloc", "CPURawDelete",
#ifdef DGL_USE_CUDA
"CUDARawAlloc", "CUDARawDelete", "CUDACurrentStream", "RecordStream",
"CUDARawAlloc", "CUDARawDelete",
"CUDACurrentStream", "RecordStream",
"CUDARawHostAlloc", "CUDARawHostDelete",
"CUDARecordHostAlloc", "CUDAHostAllocatorEmptyCache",
#endif // DGL_USE_CUDA
};
......@@ -180,6 +247,10 @@ class TensorDispatcher {
static constexpr int kCUDARawDelete = 3;
static constexpr int kCUDACurrentStream = 4;
static constexpr int kRecordStream = 5;
static constexpr int kCUDARawHostAlloc = 6;
static constexpr int kCUDARawHostDelete = 7;
static constexpr int kCUDARecordHostAlloc = 8;
static constexpr int kCUDAHostAllocatorEmptyCache = 9;
#endif // DGL_USE_CUDA
};
......@@ -190,7 +261,7 @@ class TensorDispatcher {
void* entrypoints_[num_entries_] = {
nullptr, nullptr,
#ifdef DGL_USE_CUDA
nullptr, nullptr, nullptr, nullptr,
nullptr, nullptr, nullptr, nullptr, nullptr, nullptr, nullptr, nullptr,
#endif // DGL_USE_CUDA
};
......
......@@ -254,12 +254,32 @@ class HeteroGraphIndex(ObjectBase):
"""
return _CAPI_DGLHeteroCopyTo(self, ctx.device_type, ctx.device_id)
def pin_memory(self):
"""Copies the graph structure to pinned memory, if it's not already
pinned.
NOTE: This function is similar to PyTorch's Tensor.pin_memory(), but
tailored for graphs. It utilizes the same pin_memory allocator as
PyTorch, so the lifecycle of the graph is also managed by PyTorch.
If a batch includes a DGL graph object (HeteroGraphIndex),
PyTorch's DataLoader memory pinning logic will detect it and
automatically activate this function when pin_memory=True.
Returns
-------
HeteroGraphIndex
The pinned graph index.
"""
return _CAPI_DGLHeteroPinMemory(self)
def pin_memory_(self):
"""Pin this graph to the page-locked memory.
NOTE: This is an inplace method.
The graph structure must be on CPU to be pinned.
If the graph struture is already pinned, the function directly returns it.
NOTE: This is an inplace method to pin the current graph index, i.e.,
it does not require new memory allocation but simply flags the
existing graph structure to be page-locked. The graph structure
must be on CPU to be pinned. If the graph struture is already
pinned, the function directly returns it.
Returns
-------
......
......@@ -277,6 +277,31 @@ HeteroGraphPtr HeteroGraph::CopyTo(HeteroGraphPtr g, const DGLContext& ctx) {
hgindex->meta_graph_, rel_graphs, hgindex->num_verts_per_type_));
}
HeteroGraphPtr HeteroGraph::PinMemory(HeteroGraphPtr g) {
auto casted_ptr = std::dynamic_pointer_cast<HeteroGraph>(g);
CHECK_NOTNULL(casted_ptr);
auto relation_graphs = casted_ptr->relation_graphs_;
auto it = std::find_if_not(
relation_graphs.begin(), relation_graphs.end(),
[](auto& underlying_g) { return underlying_g->IsPinned(); });
// All underlying relation graphs are pinned, return the input hetero-graph
// directly.
if (it == relation_graphs.end()) return g;
std::vector<HeteroGraphPtr> pinned_relation_graphs(relation_graphs.size());
for (size_t i = 0; i < pinned_relation_graphs.size(); ++i) {
if (!relation_graphs[i]->IsPinned()) {
pinned_relation_graphs[i] = relation_graphs[i]->PinMemory();
} else {
pinned_relation_graphs[i] = relation_graphs[i];
}
}
return HeteroGraphPtr(new HeteroGraph(
casted_ptr->meta_graph_, pinned_relation_graphs,
casted_ptr->num_verts_per_type_));
}
void HeteroGraph::PinMemory_() {
for (auto g : relation_graphs_) g->PinMemory_();
}
......
......@@ -249,6 +249,16 @@ class HeteroGraph : public BaseHeteroGraph {
*/
void UnpinMemory_();
/**
* @brief Copy the current graph to pinned memory managed by
* PyTorch CachingHostAllocator for each relation graph.
* @note If any of the underlying relation graphs are already pinned, the
* function will utilize their existing copies. If all of them are
* pinned, the function will return the original input hetero-graph
* directly.
*/
static HeteroGraphPtr PinMemory(HeteroGraphPtr g);
/**
* @brief Record stream for this graph.
* @param stream The stream that is using the graph
......
......@@ -489,6 +489,13 @@ DGL_REGISTER_GLOBAL("heterograph_index._CAPI_DGLHeteroCopyTo")
*rv = HeteroGraphRef(hg_new);
});
DGL_REGISTER_GLOBAL("heterograph_index._CAPI_DGLHeteroPinMemory")
.set_body([](DGLArgs args, DGLRetValue* rv) {
HeteroGraphRef hg = args[0];
HeteroGraphPtr hg_new = HeteroGraph::PinMemory(hg.sptr());
*rv = HeteroGraphRef(hg_new);
});
DGL_REGISTER_GLOBAL("heterograph_index._CAPI_DGLHeteroPinMemory_")
.set_body([](DGLArgs args, DGLRetValue* rv) {
HeteroGraphRef hg = args[0];
......
......@@ -140,6 +140,15 @@ class UnitGraph::COO : public BaseHeteroGraph {
return COO(meta_graph_, adj_.CopyTo(ctx));
}
/**
* @brief Copy the adj_ to pinned memory.
* @return COOMatrix of the COO graph.
*/
COO PinMemory() {
if (adj_.is_pinned) return *this;
return COO(meta_graph_, adj_.PinMemory());
}
/** @brief Pin the adj_: COOMatrix of the COO graph. */
void PinMemory_() { adj_.PinMemory_(); }
......@@ -535,6 +544,15 @@ class UnitGraph::CSR : public BaseHeteroGraph {
}
}
/**
* @brief Copy the adj_ to pinned memory.
* @return CSRMatrix of the CSR graph.
*/
CSR PinMemory() {
if (adj_.is_pinned) return *this;
return CSR(meta_graph_, adj_.PinMemory());
}
/** @brief Pin the adj_: CSRMatrix of the CSR graph. */
void PinMemory_() { adj_.PinMemory_(); }
......@@ -1259,6 +1277,37 @@ HeteroGraphPtr UnitGraph::CopyTo(HeteroGraphPtr g, const DGLContext& ctx) {
}
}
HeteroGraphPtr UnitGraph::PinMemory() {
CSRPtr pinned_in_csr, pinned_out_csr;
COOPtr pinned_coo;
if (this->in_csr_->defined() && this->in_csr_->IsPinned()) {
pinned_in_csr = this->in_csr_;
} else if (this->in_csr_->defined()) {
pinned_in_csr = CSRPtr(new CSR(this->in_csr_->PinMemory()));
} else {
pinned_in_csr = nullptr;
}
if (this->out_csr_->defined() && this->out_csr_->IsPinned()) {
pinned_out_csr = this->out_csr_;
} else if (this->out_csr_->defined()) {
pinned_out_csr = CSRPtr(new CSR(this->out_csr_->PinMemory()));
} else {
pinned_out_csr = nullptr;
}
if (this->coo_->defined() && this->coo_->IsPinned()) {
pinned_coo = this->coo_;
} else if (this->coo_->defined()) {
pinned_coo = COOPtr(new COO(this->coo_->PinMemory()));
} else {
pinned_coo = nullptr;
}
return HeteroGraphPtr(new UnitGraph(
meta_graph(), pinned_in_csr, pinned_out_csr, pinned_coo, this->formats_));
}
void UnitGraph::PinMemory_() {
if (this->in_csr_->defined()) this->in_csr_->PinMemory_();
if (this->out_csr_->defined()) this->out_csr_->PinMemory_();
......
......@@ -222,6 +222,15 @@ class UnitGraph : public BaseHeteroGraph {
*/
void UnpinMemory_();
/**
* @brief Create a copy of the current graph in pinned memory.
* @note The graph will be pinned outplace through PyTorch
* CachingHostAllocator, if available. Otherwise, an error will be thrown.
* If any of the underlying structures (incsr, outcsr, coo) are already
* pinned, the function will simply use its original copy.
*/
HeteroGraphPtr PinMemory();
/**
* @brief Record stream for this graph.
* @param stream The stream that is using the graph
......
......@@ -126,6 +126,16 @@ bool DeviceAPI::PinData(void* ptr, size_t nbytes) {
return false;
}
void* DeviceAPI::AllocPinnedDataSpace(
size_t nbytes, void** ctx, void** deleter) {
LOG(FATAL) << "Device does not support cudaHostAlloc api.";
return nullptr;
}
void DeviceAPI::FreePinnedDataSpace(void** deleter) {
LOG(FATAL) << "Device does not support cudaHostFree api.";
}
void DeviceAPI::UnpinData(void* ptr) {
LOG(FATAL) << "Device does not support cudaHostUnregister api.";
}
......
......@@ -26,8 +26,9 @@ class CPUDeviceAPI final : public DeviceAPI {
void* AllocDataSpace(
DGLContext ctx, size_t nbytes, size_t alignment,
DGLDataType type_hint) final {
TensorDispatcher* td = TensorDispatcher::Global();
if (td->IsAvailable()) return td->CPUAllocWorkspace(nbytes);
TensorDispatcher* tensor_dispatcher = TensorDispatcher::Global();
if (tensor_dispatcher->IsAvailable())
return tensor_dispatcher->CPUAllocWorkspace(nbytes);
void* ptr;
#if _MSC_VER || defined(__MINGW32__)
......@@ -44,8 +45,9 @@ class CPUDeviceAPI final : public DeviceAPI {
}
void FreeDataSpace(DGLContext ctx, void* ptr) final {
TensorDispatcher* td = TensorDispatcher::Global();
if (td->IsAvailable()) return td->CPUFreeWorkspace(ptr);
TensorDispatcher* tensor_dispatcher = TensorDispatcher::Global();
if (tensor_dispatcher->IsAvailable())
return tensor_dispatcher->CPUFreeWorkspace(ptr);
#if _MSC_VER || defined(__MINGW32__)
_aligned_free(ptr);
......@@ -63,6 +65,13 @@ class CPUDeviceAPI final : public DeviceAPI {
static_cast<const char*>(from) + from_offset, size);
}
void RecordedCopyDataFromTo(
void* from, size_t from_offset, void* to, size_t to_offset, size_t size,
DGLContext ctx_from, DGLContext ctx_to, DGLDataType type_hint,
void* pytorch_ctx) final {
BUG_IF_FAIL(false) << "This piece of code should not be reached.";
}
DGLStreamHandle CreateStream(DGLContext) final { return nullptr; }
void StreamSync(DGLContext ctx, DGLStreamHandle stream) final {}
......@@ -84,16 +93,20 @@ struct CPUWorkspacePool : public WorkspacePool {
void* CPUDeviceAPI::AllocWorkspace(
DGLContext ctx, size_t size, DGLDataType type_hint) {
TensorDispatcher* td = TensorDispatcher::Global();
if (td->IsAvailable()) return td->CPUAllocWorkspace(size);
TensorDispatcher* tensor_dispatcher = TensorDispatcher::Global();
if (tensor_dispatcher->IsAvailable()) {
return tensor_dispatcher->CPUAllocWorkspace(size);
}
return dmlc::ThreadLocalStore<CPUWorkspacePool>::Get()->AllocWorkspace(
ctx, size);
}
void CPUDeviceAPI::FreeWorkspace(DGLContext ctx, void* data) {
TensorDispatcher* td = TensorDispatcher::Global();
if (td->IsAvailable()) return td->CPUFreeWorkspace(data);
TensorDispatcher* tensor_dispatcher = TensorDispatcher::Global();
if (tensor_dispatcher->IsAvailable()) {
return tensor_dispatcher->CPUFreeWorkspace(data);
}
dmlc::ThreadLocalStore<CPUWorkspacePool>::Get()->FreeWorkspace(ctx, data);
}
......
......@@ -107,10 +107,11 @@ class CUDADeviceAPI final : public DeviceAPI {
DGLDataType type_hint) final {
SetDevice(ctx);
// Redirect to PyTorch's allocator when available.
TensorDispatcher* td = TensorDispatcher::Global();
if (td->IsAvailable())
return td->CUDAAllocWorkspace(nbytes, getCurrentCUDAStream());
TensorDispatcher* tensor_dispatcher = TensorDispatcher::Global();
if (tensor_dispatcher->IsAvailable()) {
return tensor_dispatcher->CUDAAllocWorkspace(
nbytes, getCurrentCUDAStream());
}
CHECK_EQ(256 % alignment, 0U) << "CUDA space is aligned at 256 bytes";
void* ret;
CUDA_CALL(cudaMalloc(&ret, nbytes));
......@@ -119,9 +120,10 @@ class CUDADeviceAPI final : public DeviceAPI {
void FreeDataSpace(DGLContext ctx, void* ptr) final {
SetDevice(ctx);
TensorDispatcher* td = TensorDispatcher::Global();
if (td->IsAvailable()) return td->CUDAFreeWorkspace(ptr);
TensorDispatcher* tensor_dispatcher = TensorDispatcher::Global();
if (tensor_dispatcher->IsAvailable()) {
return tensor_dispatcher->CUDAFreeWorkspace(ptr);
}
CUDA_CALL(cudaFree(ptr));
}
......@@ -163,6 +165,28 @@ class CUDADeviceAPI final : public DeviceAPI {
stream);
}
// To ensure correct behavior, `record_event` must be invoked anytime a
// pointer from PyTorch CachingHostAllocator is used in a cudaMemcpyAsync
// call. It provides a way to re-use freed pinned (page-locked) memory
// allocations and avoid device sync due to cudaFreeHost calls.
void RecordedCopyDataFromTo(
void* from, size_t from_offset, void* to, size_t to_offset, size_t size,
DGLContext ctx_from, DGLContext ctx_to, DGLDataType type_hint,
void* pytorch_ctx) final {
auto stream = GetStream();
CopyDataFromTo(
from, from_offset, to, to_offset, size, ctx_from, ctx_to, type_hint,
stream);
auto tensor_dispatcher = TensorDispatcher::Global();
if (tensor_dispatcher->IsAvailable()) {
auto custream = static_cast<cudaStream_t>(stream);
void* ptr = ctx_to.device_type == kDGLCPU ? to : from;
int id =
ctx_to.device_type == kDGLCPU ? ctx_from.device_id : ctx_to.device_id;
tensor_dispatcher->CUDARecordHostAlloc(ptr, pytorch_ctx, custream, id);
}
}
DGLStreamHandle CreateStream(DGLContext ctx) {
CUDA_CALL(cudaSetDevice(ctx.device_id));
cudaStream_t retval;
......@@ -214,6 +238,12 @@ class CUDADeviceAPI final : public DeviceAPI {
bool PinData(void* ptr, size_t nbytes) override {
// prevent users from pinning empty tensors or graphs
if (ptr == nullptr || nbytes == 0) return false;
TensorDispatcher* tensor_dispatcher = TensorDispatcher::Global();
// Minimize the pinned memory pool allocated by backend (via tensoradapter)
// to preserve enough memory for DGL inherited in-place pin-memory operation
if (tensor_dispatcher->IsAvailable()) {
tensor_dispatcher->CUDAHostAllocatorEmptyCache();
}
CUDA_CALL(cudaHostRegister(ptr, nbytes, cudaHostRegisterDefault));
return true;
}
......@@ -223,6 +253,25 @@ class CUDADeviceAPI final : public DeviceAPI {
CUDA_CALL(cudaHostUnregister(ptr));
}
void* AllocPinnedDataSpace(
size_t nbytes, void** ctx, void** deleter) override {
// prevent pinning empty tensors or graphs
if (nbytes == 0) return nullptr;
TensorDispatcher* tensor_dispatcher = TensorDispatcher::Global();
CHECK(tensor_dispatcher->IsAvailable())
<< "CachingHostAllocator is not available in the current backend "
"PyTorch. Please update the PyTorch version to 1.11+";
return tensor_dispatcher->CUDAAllocHostWorkspace(nbytes, ctx, deleter);
}
void FreePinnedDataSpace(void** deleter) override {
TensorDispatcher* tensor_dispatcher = TensorDispatcher::Global();
CHECK(tensor_dispatcher->IsAvailable())
<< "CachingHostAllocator is not available in the current backend "
"PyTorch. Please update the PyTorch version to 1.11+";
tensor_dispatcher->CUDAFreeHostWorkspace(deleter);
}
bool IsPinned(const void* ptr) override {
// can't be a pinned tensor if CUDA context is unavailable.
if (!is_available_) return false;
......@@ -264,17 +313,19 @@ class CUDADeviceAPI final : public DeviceAPI {
DGLContext ctx, size_t size, DGLDataType type_hint) final {
SetDevice(ctx);
// Redirect to PyTorch's allocator when available.
TensorDispatcher* td = TensorDispatcher::Global();
if (td->IsAvailable())
return td->CUDAAllocWorkspace(size, getCurrentCUDAStream());
TensorDispatcher* tensor_dispatcher = TensorDispatcher::Global();
if (tensor_dispatcher->IsAvailable())
return tensor_dispatcher->CUDAAllocWorkspace(
size, getCurrentCUDAStream());
return CUDAThreadEntry::ThreadLocal()->pool.AllocWorkspace(ctx, size);
}
void FreeWorkspace(DGLContext ctx, void* data) final {
SetDevice(ctx);
TensorDispatcher* td = TensorDispatcher::Global();
if (td->IsAvailable()) return td->CUDAFreeWorkspace(data);
TensorDispatcher* tensor_dispatcher = TensorDispatcher::Global();
if (tensor_dispatcher->IsAvailable())
return tensor_dispatcher->CUDAFreeWorkspace(data);
CUDAThreadEntry::ThreadLocal()->pool.FreeWorkspace(ctx, data);
}
......@@ -309,9 +360,9 @@ CUDAThreadEntry* CUDAThreadEntry::ThreadLocal() {
}
cudaStream_t getCurrentCUDAStream() {
TensorDispatcher* td = TensorDispatcher::Global();
if (td->IsAvailable())
return td->CUDAGetCurrentStream();
TensorDispatcher* tensor_dispatcher = TensorDispatcher::Global();
if (tensor_dispatcher->IsAvailable())
return tensor_dispatcher->CUDAGetCurrentStream();
else // return the default stream when TA is not available
return nullptr;
}
......
......@@ -68,8 +68,16 @@ void NDArray::Internal::DefaultDeleter(NDArray::Container* ptr) {
} else if (ptr->dl_tensor.data != nullptr) {
// if the array is still pinned before freeing, unpin it.
if (ptr->pinned_by_dgl_) UnpinContainer(ptr);
dgl::runtime::DeviceAPI::Get(ptr->dl_tensor.ctx)
->FreeDataSpace(ptr->dl_tensor.ctx, ptr->dl_tensor.data);
if (ptr->pinned_by_pytorch_) {
DeviceAPI::Get(kDGLCUDA)->FreePinnedDataSpace(
&(ptr->pytorch_raw_deleter_));
CHECK(ptr->pytorch_raw_deleter_ == nullptr);
ptr->pinned_by_pytorch_ = false;
ptr->pytorch_ctx_ = nullptr;
} else {
dgl::runtime::DeviceAPI::Get(ptr->dl_tensor.ctx)
->FreeDataSpace(ptr->dl_tensor.ctx, ptr->dl_tensor.data);
}
}
delete ptr;
}
......@@ -159,7 +167,6 @@ NDArray NDArray::EmptyShared(
const std::string& name, std::vector<int64_t> shape, DGLDataType dtype,
DGLContext ctx, bool is_create) {
NDArray ret = Internal::Create(shape, dtype, ctx);
// setup memory content
size_t size = GetDataSize(ret.data_->dl_tensor);
auto mem = std::make_shared<SharedMemory>(name);
if (is_create) {
......@@ -175,7 +182,6 @@ NDArray NDArray::EmptyShared(
NDArray NDArray::Empty(
std::vector<int64_t> shape, DGLDataType dtype, DGLContext ctx) {
NDArray ret = Internal::Create(shape, dtype, ctx);
// setup memory content
size_t size = GetDataSize(ret.data_->dl_tensor);
size_t alignment = GetDataAlignment(ret.data_->dl_tensor);
if (size > 0)
......@@ -206,6 +212,44 @@ void NDArray::CopyFromTo(DGLArray* from, DGLArray* to) {
from->dtype);
}
void NDArray::RecordedCopyFromTo(
DGLArray* from, DGLArray* to, void* pytorch_ctx) {
size_t from_size = GetDataSize(*from);
size_t to_size = GetDataSize(*to);
CHECK_EQ(from_size, to_size)
<< "DGLArrayCopyFromTo: The size must exactly match.";
CHECK(from->ctx.device_type != to->ctx.device_type)
<< "Recoding event is only called for the copy between CPU and GPU.";
CHECK(from->ctx.device_type == kDGLCUDA || to->ctx.device_type == kDGLCUDA)
<< "At least one CUDA ctx needs to be involved.";
DeviceAPI::Get(kDGLCUDA)->RecordedCopyDataFromTo(
from->data, static_cast<size_t>(from->byte_offset), to->data,
static_cast<size_t>(to->byte_offset), from_size, from->ctx, to->ctx,
from->dtype, pytorch_ctx);
}
NDArray NDArray::PinnedEmpty(
std::vector<int64_t> shape, DGLDataType dtype, DGLContext ctx) {
CHECK_EQ(ctx.device_type, kDGLCPU) << "Only NDArray on CPU can be pinned";
NDArray ret = Internal::Create(shape, dtype, ctx);
size_t size = GetDataSize(ret.data_->dl_tensor);
if (size > 0) {
ret.data_->dl_tensor.data = DeviceAPI::Get(kDGLCUDA)->AllocPinnedDataSpace(
size, &(ret.data_->pytorch_ctx_), &(ret.data_->pytorch_raw_deleter_));
CHECK(
ret.data_->pytorch_ctx_ != nullptr &&
ret.data_->pytorch_raw_deleter_ != nullptr)
<< "The allocation failed in PyTorch's CachingHostAllocator. "
<< "The returned context pointer is " << ret.data_->pytorch_ctx_
<< " and the function deleter is " << ret.data_->pytorch_raw_deleter_;
ret.data_->pinned_by_pytorch_ = true;
}
return ret;
}
void NDArray::PinContainer(NDArray::Container* ptr) {
if (IsContainerPinned(ptr)) return;
auto* tensor = &(ptr->dl_tensor);
......@@ -229,13 +273,13 @@ void NDArray::UnpinContainer(NDArray::Container* ptr) {
}
void NDArray::RecordStream(DGLArray* tensor, DGLStreamHandle stream) {
TensorDispatcher* td = TensorDispatcher::Global();
CHECK(td->IsAvailable())
<< "RecordStream only works when TensorAdaptor is available.";
TensorDispatcher* tensor_dispatcher = TensorDispatcher::Global();
CHECK(tensor_dispatcher->IsAvailable())
<< "RecordStream only works when TensorAdapter is available.";
CHECK_EQ(tensor->ctx.device_type, kDGLCUDA)
<< "RecordStream only works with GPU tensors.";
td->RecordStream(tensor->data, stream, tensor->ctx.device_id);
tensor_dispatcher->RecordStream(tensor->data, stream, tensor->ctx.device_id);
}
template <typename T>
......@@ -300,7 +344,7 @@ std::shared_ptr<SharedMemory> NDArray::GetSharedMem() const {
}
bool NDArray::IsContainerPinned(NDArray::Container* ptr) {
if (ptr->pinned_by_dgl_) return true;
if (ptr->pinned_by_dgl_ || ptr->pinned_by_pytorch_) return true;
auto* tensor = &(ptr->dl_tensor);
// Can only be pinned if on CPU...
if (tensor->ctx.device_type != kDGLCPU) return false;
......
......@@ -65,6 +65,46 @@ cudaStream_t CUDACurrentStream();
* @param device_id Device of the tensor.
*/
void RecordStream(void* ptr, cudaStream_t stream, int device_id);
/**
* @brief Allocate a piece of pinned CPU memory via
* PyTorch's CachingHostAllocator.
*
* @param nbytes The size to be allocated.
* @param ctx Pointer to the PyTorch storage ctx ptr returned from the
* allocator.
* @param deleter Pointer to the delete function ptr returned from the
* allocator.
* @return Raw pointer to the allocated memory.
*/
void* CUDARawHostAlloc(size_t nbytes, void** ctx, void** raw_deleter);
/**
* @brief 'Free' the pinned CPU memory via
* inserting the memory block back to the free list.
*
* @param deleter Pointer to the delete function ptr returned from the
* allocator.
*/
void CUDARawHostDelete(void** raw_deleter);
/**
* @brief 'Record' a CUDA stream (usually from a copy kernel) for the pinned
* memory via PyTorch's CachingHostAllocator.
*
* @param data Pointer of the tensor to be recorded.
* @param ctx PyTorch storage ctx ptr returned from the allocator.
* @param stream The stream that currently consumes this tensor.
* @param device_id Device of the tensor.
*/
void CUDARecordHostAlloc(
void* data, void* ctx, cudaStream_t stream, int device_id);
/**
* @brief Release cached pinned memory allocations via cudaHostFree.
*/
void CUDAHostAllocatorEmptyCache();
#endif // DGL_USE_CUDA
}
......
......@@ -8,6 +8,7 @@
#include <tensoradapter_exports.h>
#ifdef DGL_USE_CUDA
#include <ATen/cuda/CUDAContext.h>
#include <ATen/cuda/CachingHostAllocator.h>
#include <c10/cuda/CUDACachingAllocator.h>
#include <c10/cuda/CUDAStream.h>
#include <cuda_runtime.h>
......@@ -55,6 +56,50 @@ TA_EXPORTS void RecordStream(void* ptr, cudaStream_t stream, int device_id) {
reinterpret_cast<int64_t>(stream))));
data_ptr.release_context();
}
class CUDAHostDeleter {
public:
explicit CUDAHostDeleter(std::unique_ptr<void, c10::DeleterFnPtr> ptr)
: ptr_(std::move(ptr)) {}
private:
std::unique_ptr<void, c10::DeleterFnPtr> ptr_;
};
TA_EXPORTS void* CUDARawHostAlloc(
size_t nbytes, void** ctx, void** raw_deleter) {
auto data_ptr = at::cuda::getCachingHostAllocator()->allocate(nbytes);
auto raw = data_ptr.get();
// Return the raw ctx ptr for recording event.
*ctx = data_ptr.get_context();
// Transfer ownership to raw_deleter.
auto* data_deleter = new CUDAHostDeleter(data_ptr.move_context());
*raw_deleter = static_cast<void*>(data_deleter);
return raw;
}
// Designated CUDAHostDeleter for CUDARawHostAlloc.
TA_EXPORTS void CUDARawHostDelete(void** raw_deleter) {
delete static_cast<CUDAHostDeleter*>(*raw_deleter);
*raw_deleter = nullptr;
}
TA_EXPORTS void CUDARecordHostAlloc(
void* ptr, void* ctx, cudaStream_t stream, int device_id) {
at::cuda::CachingHostAllocator_recordEvent(
ptr, ctx,
c10::cuda::CUDAStream(
c10::cuda::CUDAStream::UNCHECKED,
c10::Stream(
c10::Stream::UNSAFE,
c10::Device(c10::DeviceType::CUDA, device_id),
reinterpret_cast<int64_t>(stream))));
}
TA_EXPORTS void CUDAHostAllocatorEmptyCache() {
at::cuda::CachingHostAllocator_emptyCache();
}
#endif // DGL_USE_CUDA
};
......
import unittest
import backend as F
import dgl
import pytest
from dgl import DGLError
from utils import parametrize_idtype
def create_test_heterograph(idtype):
# 3 users, 2 games, 2 developers
# metagraph:
# ('user', 'follows', 'user'),
# ('user', 'plays', 'game'),
# ('user', 'wishes', 'game'),
# ('developer', 'develops', 'game')])
g = dgl.heterograph(
{
("user", "follows", "user"): ([0, 1], [1, 2]),
("user", "plays", "game"): ([0, 1, 2, 1], [0, 0, 1, 1]),
("user", "wishes", "game"): ([0, 2], [1, 0]),
("developer", "develops", "game"): ([0, 1], [0, 1]),
},
idtype=idtype,
device=F.ctx(),
)
assert g.idtype == idtype
assert g.device == F.ctx()
return g
@unittest.skipIf(
F._default_context_str == "cpu", reason="Need gpu for this test"
)
@unittest.skipIf(
dgl.backend.backend_name != "pytorch",
reason="Pinning graph outplace only supported for PyTorch",
)
@parametrize_idtype
def test_pin_memory(idtype):
g = create_test_heterograph(idtype)
g.nodes["user"].data["h"] = F.ones((3, 5))
g.nodes["game"].data["i"] = F.ones((2, 5))
g.edges["plays"].data["e"] = F.ones((4, 4))
g = g.to(F.cpu())
assert not g.is_pinned()
# Test pinning a CPU graph.
g._graph.pin_memory()
assert not g.is_pinned()
g._graph = g._graph.pin_memory()
assert g.is_pinned()
assert g.device == F.cpu()
# when clone with a new (different) formats, e.g., g.formats("csc")
# ensure the new graphs are not pinned
assert not g.formats("csc").is_pinned()
assert not g.formats("csr").is_pinned()
# 'coo' formats is the default and thus not cloned
assert g.formats("coo").is_pinned()
# Test pinning a GPU graph will cause error raised.
g1 = g.to(F.cuda())
with pytest.raises(DGLError):
g1._graph.pin_memory()
# Test pinning an empty homograph
g2 = dgl.graph(([], []))
g2._graph = g2._graph.pin_memory()
assert g2.is_pinned()
# Test pinning heterograph with 0 edge of one relation type
g3 = dgl.heterograph(
{("a", "b", "c"): ([0, 1], [1, 2]), ("c", "d", "c"): ([], [])}
).astype(idtype)
g3._graph = g3._graph.pin_memory()
assert g3.is_pinned()
if __name__ == "__main__":
pass
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment