Unverified Commit 69a532c1 authored by Muhammed Fatih BALIN's avatar Muhammed Fatih BALIN Committed by GitHub
Browse files

[Feature] Gpu cache for node and edge data (#4341)


Co-authored-by: default avatarxiny <xiny@nvidia.com>
parent 7ec78bb6
......@@ -39,6 +39,7 @@ include_patterns = [
'**/*.cu',
]
exclude_patterns = [
'third_party/**',
]
init_command = [
'python3',
......
......@@ -227,6 +227,21 @@ if((NOT MSVC) AND (NOT ${CMAKE_SYSTEM_NAME} MATCHES "Darwin"))
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wl,--exclude-libs,ALL")
endif()
# Compile gpu_cache
if(USE_CUDA)
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -DUSE_GPU_CACHE")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -DUSE_GPU_CACHE")
# Manually build gpu_cache because CMake always builds it as shared
file(GLOB gpu_cache_src
third_party/HugeCTR/gpu_cache/src/nv_gpu_cache.cu
)
cuda_add_library(gpu_cache STATIC ${gpu_cache_src})
target_include_directories(gpu_cache PRIVATE "third_party/HugeCTR/gpu_cache/include")
target_include_directories(dgl PRIVATE "third_party/HugeCTR/gpu_cache/include")
list(APPEND DGL_LINKER_LIBS gpu_cache)
message(STATUS "Build with HugeCTR GPU embedding cache.")
endif(USE_CUDA)
# support PARALLEL_ALGORITHMS
if (LIBCXX_ENABLE_PARALLEL_ALGORITHMS)
add_definitions(-DPARALLEL_ALGORITHMS)
......
......@@ -202,59 +202,6 @@ function(dgl_select_nvcc_arch_flags out_variable)
set(${out_variable}_readable ${__nvcc_archs_readable} PARENT_SCOPE)
endfunction()
################################################################################################
# Short command for cuda compilation
# Usage:
# dgl_cuda_compile(<objlist_variable> <cuda_files>)
macro(dgl_cuda_compile objlist_variable)
foreach(var CMAKE_CXX_FLAGS CMAKE_CXX_FLAGS_RELEASE CMAKE_CXX_FLAGS_DEBUG)
set(${var}_backup_in_cuda_compile_ "${${var}}")
# we remove /EHa as it generates warnings under windows
string(REPLACE "/EHa" "" ${var} "${${var}}")
endforeach()
if(UNIX OR APPLE)
list(APPEND CUDA_NVCC_FLAGS -Xcompiler -fPIC --std=c++14)
endif()
if(APPLE)
list(APPEND CUDA_NVCC_FLAGS -Xcompiler -Wno-unused-function)
endif()
set(CUDA_NVCC_FLAGS_DEBUG "${CUDA_NVCC_FLAGS_DEBUG} -G")
if(MSVC)
# disable noisy warnings:
# 4819: The file contains a character that cannot be represented in the current code page (number).
list(APPEND CUDA_NVCC_FLAGS -Xcompiler "/wd4819")
foreach(flag_var
CMAKE_CXX_FLAGS CMAKE_CXX_FLAGS_DEBUG CMAKE_CXX_FLAGS_RELEASE
CMAKE_CXX_FLAGS_MINSIZEREL CMAKE_CXX_FLAGS_RELWITHDEBINFO)
if(${flag_var} MATCHES "/MD")
string(REGEX REPLACE "/MD" "/MT" ${flag_var} "${${flag_var}}")
endif(${flag_var} MATCHES "/MD")
endforeach(flag_var)
endif()
# If the build system is a container, make sure the nvcc intermediate files
# go into the build output area rather than in /tmp, which may run out of space
if(IS_CONTAINER_BUILD)
set(CUDA_NVCC_INTERMEDIATE_DIR "${CMAKE_CURRENT_BINARY_DIR}")
message(STATUS "Container build enabled, so nvcc intermediate files in: ${CUDA_NVCC_INTERMEDIATE_DIR}")
list(APPEND CUDA_NVCC_FLAGS "--keep --keep-dir ${CUDA_NVCC_INTERMEDIATE_DIR}")
endif()
cuda_compile(cuda_objcs ${ARGN})
foreach(var CMAKE_CXX_FLAGS CMAKE_CXX_FLAGS_RELEASE CMAKE_CXX_FLAGS_DEBUG)
set(${var} "${${var}_backup_in_cuda_compile_}")
unset(${var}_backup_in_cuda_compile_)
endforeach()
set(${objlist_variable} ${cuda_objcs})
endmacro()
################################################################################################
# Config cuda compilation.
# Usage:
......@@ -289,7 +236,7 @@ macro(dgl_config_cuda out_variable)
set(CUDA_PROPAGATE_HOST_FLAGS OFF)
# 0. Add host flags
message(STATUS "${CMAKE_CXX_FLAGS}")
message(STATUS "CMAKE_CXX_FLAGS: ${CMAKE_CXX_FLAGS}")
string(REGEX REPLACE "[ \t\n\r]" "," CXX_HOST_FLAGS "${CMAKE_CXX_FLAGS}")
if(MSVC AND NOT USE_MSVC_MT)
string(CONCAT CXX_HOST_FLAGS ${CXX_HOST_FLAGS} ",/MD")
......@@ -303,14 +250,7 @@ macro(dgl_config_cuda out_variable)
# 2. flags in third_party/moderngpu
list(APPEND CUDA_NVCC_FLAGS "--expt-extended-lambda;-Wno-deprecated-declarations")
# 3. CUDA 11 requires c++14 by default
include(CheckCXXCompilerFlag)
check_cxx_compiler_flag("-std=c++14" SUPPORT_CXX14)
string(REPLACE "-std=c++11" "" CUDA_NVCC_FLAGS "${CUDA_NVCC_FLAGS}")
list(APPEND CUDA_NVCC_FLAGS "-std=c++14")
message(STATUS "CUDA flags: ${CUDA_NVCC_FLAGS}")
message(STATUS "CUDA_NVCC_FLAGS: ${CUDA_NVCC_FLAGS}")
list(APPEND DGL_LINKER_LIBS
${CUDA_CUDART_LIBRARY}
......
""" CUDA wrappers """
from .. import backend as F
from .gpu_cache import GPUCache
if F.get_preferred_backend() == "pytorch":
from . import nccl
"""API wrapping HugeCTR gpu_cache."""
# Copyright (c) 2022, NVIDIA Corporation
# All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# @file gpu_cache.py
# @brief API for managing a GPU Cache
from .. import backend as F
from .._ffi.function import _init_api
class GPUCache(object):
"""High-level wrapper for GPU embedding cache"""
def __init__(self, num_items, num_feats, idtype=F.int64):
assert idtype in [F.int32, F.int64]
self._cache = _CAPI_DGLGpuCacheCreate(
num_items, num_feats, 32 if idtype == F.int32 else 64
)
self.idtype = idtype
self.total_miss = 0
self.total_queries = 0
def query(self, keys):
"""Queries the GPU cache.
Parameters
----------
keys : Tensor
The keys to query the GPU cache with.
Returns
-------
tuple(Tensor, Tensor, Tensor)
A tuple containing (values, missing_indices, missing_keys) where
values[missing_indices] corresponds to cache misses that should be
filled by quering another source with missing_keys.
"""
self.total_queries += keys.shape[0]
keys = F.astype(keys, self.idtype)
values, missing_index, missing_keys = _CAPI_DGLGpuCacheQuery(
self._cache, F.to_dgl_nd(keys)
)
self.total_miss += missing_keys.shape[0]
return (
F.from_dgl_nd(values),
F.from_dgl_nd(missing_index),
F.from_dgl_nd(missing_keys),
)
def replace(self, keys, values):
"""Inserts key-value pairs into the GPU cache using the Least-Recently
Used (LRU) algorithm to remove old key-value pairs if it is full.
Parameters
----------
keys: Tensor
The keys to insert to the GPU cache.
values: Tensor
The values to insert to the GPU cache.
"""
keys = F.astype(keys, self.idtype)
values = F.astype(values, F.float32)
_CAPI_DGLGpuCacheReplace(
self._cache, F.to_dgl_nd(keys), F.to_dgl_nd(values)
)
@property
def miss_rate(self):
"""Returns the cache miss rate since creation."""
return self.total_miss / self.total_queries
_init_api("dgl.cuda", __name__)
/*!
* Copyright (c) 2022 by Contributors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* \file gpu_cache.cu
* \brief Implementation of wrapper HugeCTR gpu_cache routines.
*/
#ifndef DGL_RUNTIME_CUDA_GPU_CACHE_H_
#define DGL_RUNTIME_CUDA_GPU_CACHE_H_
#include <cuda_runtime.h>
#include <dgl/array.h>
#include <dgl/aten/array_ops.h>
#include <dgl/packed_func_ext.h>
#include <dgl/runtime/container.h>
#include <dgl/runtime/device_api.h>
#include <dgl/runtime/object.h>
#include <dgl/runtime/registry.h>
#include <nv_gpu_cache.hpp>
#include "../../runtime/cuda/cuda_common.h"
namespace dgl {
namespace runtime {
namespace cuda {
template <typename key_t>
class GpuCache : public runtime::Object {
constexpr static int set_associativity = 2;
constexpr static int WARP_SIZE = 32;
constexpr static int bucket_size = WARP_SIZE * set_associativity;
using gpu_cache_t = gpu_cache::gpu_cache<
key_t, uint64_t, std::numeric_limits<key_t>::max(), set_associativity,
WARP_SIZE>;
public:
static constexpr const char *_type_key =
sizeof(key_t) == 4 ? "cuda.GpuCache32" : "cuda.GpuCache64";
DGL_DECLARE_OBJECT_TYPE_INFO(GpuCache, Object);
GpuCache(size_t num_items, size_t num_feats)
: num_feats(num_feats),
cache(std::make_unique<gpu_cache_t>(
(num_items + bucket_size - 1) / bucket_size, num_feats)) {
CUDA_CALL(cudaGetDevice(&cuda_device));
}
std::tuple<NDArray, IdArray, IdArray> Query(IdArray keys) {
const auto &ctx = keys->ctx;
cudaStream_t stream = dgl::runtime::getCurrentCUDAStream();
auto device = dgl::runtime::DeviceAPI::Get(ctx);
CHECK_EQ(ctx.device_type, kDGLCUDA)
<< "The keys should be on a CUDA device";
CHECK_EQ(ctx.device_id, cuda_device)
<< "The keys should be on the correct CUDA device";
CHECK_EQ(keys->ndim, 1)
<< "The tensor of requested indices must be of dimension one.";
NDArray values = NDArray::Empty(
{keys->shape[0], (int64_t)num_feats}, DGLDataType{kDGLFloat, 32, 1},
ctx);
IdArray missing_index = aten::NewIdArray(keys->shape[0], ctx, 64);
IdArray missing_keys =
aten::NewIdArray(keys->shape[0], ctx, sizeof(key_t) * 8);
size_t *missing_len =
static_cast<size_t *>(device->AllocWorkspace(ctx, sizeof(size_t)));
cache->Query(
static_cast<const key_t *>(keys->data), keys->shape[0],
static_cast<float *>(values->data),
static_cast<uint64_t *>(missing_index->data),
static_cast<key_t *>(missing_keys->data), missing_len, stream);
size_t missing_len_host;
device->CopyDataFromTo(
missing_len, 0, &missing_len_host, 0, sizeof(missing_len_host), ctx,
DGLContext{kDGLCPU, 0}, keys->dtype);
device->FreeWorkspace(ctx, missing_len);
missing_index = missing_index.CreateView(
{(int64_t)missing_len_host}, missing_index->dtype);
missing_keys =
missing_keys.CreateView({(int64_t)missing_len_host}, keys->dtype);
return std::make_tuple(values, missing_index, missing_keys);
}
void Replace(IdArray keys, NDArray values) {
cudaStream_t stream = dgl::runtime::getCurrentCUDAStream();
CHECK_EQ(keys->ctx.device_type, kDGLCUDA)
<< "The keys should be on a CUDA device";
CHECK_EQ(keys->ctx.device_id, cuda_device)
<< "The keys should be on the correct CUDA device";
CHECK_EQ(values->ctx.device_type, kDGLCUDA)
<< "The values should be on a CUDA device";
CHECK_EQ(values->ctx.device_id, cuda_device)
<< "The values should be on the correct CUDA device";
CHECK_EQ(keys->shape[0], values->shape[0])
<< "First dimensions of keys and values must match";
CHECK_EQ(values->shape[1], num_feats) << "Embedding dimension must match";
cache->Replace(
static_cast<const key_t *>(keys->data), keys->shape[0],
static_cast<const float *>(values->data), stream);
}
private:
size_t num_feats;
std::unique_ptr<gpu_cache_t> cache;
int cuda_device;
};
static_assert(sizeof(unsigned int) == 4);
DGL_DEFINE_OBJECT_REF(GpuCacheRef32, GpuCache<unsigned int>);
// The cu file in HugeCTR gpu cache uses unsigned int and long long.
// Changing to int64_t results in a mismatch of template arguments.
static_assert(sizeof(long long) == 8); // NOLINT
DGL_DEFINE_OBJECT_REF(GpuCacheRef64, GpuCache<long long>); // NOLINT
/* CAPI **********************************************************************/
using namespace dgl::runtime;
DGL_REGISTER_GLOBAL("cuda._CAPI_DGLGpuCacheCreate")
.set_body([](DGLArgs args, DGLRetValue *rv) {
const size_t num_items = args[0];
const size_t num_feats = args[1];
const int num_bits = args[2];
if (num_bits == 32)
*rv = GpuCacheRef32(
std::make_shared<GpuCache<unsigned int>>(num_items, num_feats));
else
*rv = GpuCacheRef64(std::make_shared<GpuCache<long long>>( // NOLINT
num_items, num_feats));
});
DGL_REGISTER_GLOBAL("cuda._CAPI_DGLGpuCacheQuery")
.set_body([](DGLArgs args, DGLRetValue *rv) {
IdArray keys = args[1];
List<ObjectRef> ret;
if (keys->dtype.bits == 32) {
GpuCacheRef32 cache = args[0];
auto result = cache->Query(keys);
ret.push_back(Value(MakeValue(std::get<0>(result))));
ret.push_back(Value(MakeValue(std::get<1>(result))));
ret.push_back(Value(MakeValue(std::get<2>(result))));
} else {
GpuCacheRef64 cache = args[0];
auto result = cache->Query(keys);
ret.push_back(Value(MakeValue(std::get<0>(result))));
ret.push_back(Value(MakeValue(std::get<1>(result))));
ret.push_back(Value(MakeValue(std::get<2>(result))));
}
*rv = ret;
});
DGL_REGISTER_GLOBAL("cuda._CAPI_DGLGpuCacheReplace")
.set_body([](DGLArgs args, DGLRetValue *rv) {
IdArray keys = args[1];
NDArray values = args[2];
if (keys->dtype.bits == 32) {
GpuCacheRef32 cache = args[0];
cache->Replace(keys, values);
} else {
GpuCacheRef64 cache = args[0];
cache->Replace(keys, values);
}
*rv = List<ObjectRef>{};
});
} // namespace cuda
} // namespace runtime
} // namespace dgl
#endif
#
# Copyright (c) 2022 by Contributors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import unittest
import backend as F
import dgl
from utils import parametrize_idtype
D = 5
def generate_graph(idtype, grad=False, add_data=True):
g = dgl.DGLGraph().to(F.ctx(), dtype=idtype)
g.add_nodes(10)
u, v = [], []
# create a graph where 0 is the source and 9 is the sink
for i in range(1, 9):
u.append(0)
v.append(i)
u.append(i)
v.append(9)
# add a back flow from 9 to 0
u.append(9)
v.append(0)
g.add_edges(u, v)
if add_data:
ncol = F.randn((10, D))
ecol = F.randn((17, D))
if grad:
ncol = F.attach_grad(ncol)
ecol = F.attach_grad(ecol)
g.ndata["h"] = ncol
g.edata["l"] = ecol
return g
@unittest.skipIf(not F.gpu_ctx(), reason="only necessary with GPU")
@parametrize_idtype
def test_gpu_cache(idtype):
g = generate_graph(idtype)
cache = dgl.cuda.GPUCache(5, D, idtype)
h = g.ndata["h"]
t = 5
keys = F.arange(0, t, dtype=idtype)
values, m_idx, m_keys = cache.query(keys)
m_values = h[F.tensor(m_keys, F.int64)]
values[F.tensor(m_idx, F.int64)] = m_values
cache.replace(m_keys, m_values)
keys = F.arange(3, 8, dtype=idtype)
values, m_idx, m_keys = cache.query(keys)
assert m_keys.shape[0] == 3 and m_idx.shape[0] == 3
m_values = h[F.tensor(m_keys, F.int64)]
values[F.tensor(m_idx, F.int64)] = m_values
assert (values != h[F.tensor(keys, F.int64)]).sum().item() == 0
cache.replace(m_keys, m_values)
if __name__ == "__main__":
test_gpu_cache(F.int64)
test_gpu_cache(F.int32)
# GPU Embedding Cache
This project implements an embedding cache on GPU memory that is designed for CTR inference and training workload.
The cache stores the hot pairs, (embedding id, embedding vectors), on GPU memory.
Storing the data on GPU memory reduces the traffic to the parameter server when performing embedding table lookup.
The cache is designed for CTR inference and training, it has following features and restrictions:
* All the backup memory-side operations are performed by the parameter server.
These operations include prefetching, latency hiding, and so on.
* This is a single-GPU design.
Each cache belongs to one GPU.
* The cache is thread-safe: multiple workers, CPU threads, can concurrently call the API of a single cache object with well-defined behavior.
* The cache implements a least recently used (LRU) replacement algorithm so that it caches the most recently queried embeddings.
* The embeddings stored inside the cache are unique: there are no duplicated embedding IDs in the cache.
## Project Structure
This project is a stand-alone module in HugeCTR project.
The root folder of this project is the `gpu_cache` folder under the HugeCTR root directory.
The `include` folder contains the headers for the cache library and the `src` folder contains the implementations and Makefile for the cache library.
The `test` folder contains a test that tests the correctness and performance of the GPU embedding cache.
The test also acts as sample code that shows how to use the cache.
The `nv_gpu_cache.hpp` file contains the definition of the main class, `gpu_cache`, that implements the GPU embedding cache.
The `nv_gpu_cache.cu` file contains the implementation.
As a module of HugeCTR, this project is built with and used by the HugeCTR project.
## Supported Data Types
* The cache supports 32 and 64-bit scalar integer types for the key (embedding ID) type.
For example, the data type declarations `unsigned int` and `long long` match these integer types.
* The cache supports a vector of floats for the value (embedding vector) type.
* You need to specify an empty key to indicate the empty bucket.
Do not use an empty key to represent any real key.
* Refer to the instantiation code at the end of the `nv_gpu_cache.cu` file for template parameters.
## Requirements
* NVIDIA GPU >= Volta (SM 70).
* CUDA environment >= 11.0.
* (Optional) libcu++ library >= 1.1.0.
The CUDA Toolkit 11.0 (Early Access) and above meets the required library version.
Using the libcu++ library provides better performance and more precisely-defined behavior.
You can enable libcu++ library by defining the `LIBCUDACXX_VERSION` macro when you compile.
Otherwise, the libcu++ library is not enabled.
* The default building option for HugeCTR is to disable the libcu++ library.
## Usage Overview
```c++
template<typename key_type,
typename ref_counter_type,
key_type empty_key,
int set_associativity,
int warp_size,
typename set_hasher = MurmurHash3_32<key_type>,
typename slab_hasher = Mod_Hash<key_type, size_t>>
class gpu_cache{
public:
//Ctor
gpu_cache(const size_t capacity_in_set, const size_t embedding_vec_size);
//Dtor
~gpu_cache();
// Query API, i.e. A single read from the cache
void Query(const key_type* d_keys,
const size_t len,
float* d_values,
uint64_t* d_missing_index,
key_type* d_missing_keys,
size_t* d_missing_len,
cudaStream_t stream,
const size_t task_per_warp_tile = TASK_PER_WARP_TILE_MACRO);
// Replace API, i.e. Follow the Query API to update the content of the cache to Most Recent
void Replace(const key_type* d_keys,
const size_t len,
const float* d_values,
cudaStream_t stream,
const size_t task_per_warp_tile = TASK_PER_WARP_TILE_MACRO);
// Update API, i.e. update the embeddings which exist in the cache
void Update(const key_type* d_keys,
const size_t len,
const float* d_values,
cudaStream_t stream,
const size_t task_per_warp_tile = TASK_PER_WARP_TILE_MACRO);
// Dump API, i.e. dump some slabsets' keys from the cache
void Dump(key_type* d_keys,
size_t* d_dump_counter,
const size_t start_set_index,
const size_t end_set_index,
cudaStream_t stream);
};
```
## API
`Constructor`
To create a new embedding cache, you need to provide the following:
* Template parameters:
+ key_type: the data type of embedding ID.
+ ref_counter_type: the data type of the internal counter. This data type should be 64bit unsigned integer(i.e. uint64_t), 32bit integer has the risk of overflow.
+ empty_key: the key value indicate for empty bucket(i.e. The empty key), user should never use empty key value to represent any real keys.
+ set_associativity: the hyper-parameter indicates how many slabs per cache set.(See `Performance hint` session below)
+ warp_size: the hyper-parameter indicates how many [key, value] pairs per slab. Acceptable value includes 1/2/4/8/16/32.(See `Performance hint` session below)
+ For other template parameters just use the default value.
* Parameters:
+ capacity_in_set: # of cache set in the embedding cache. So the total capacity of the embedding cache is `warp_size * set_associativity * capacity_in_set` [key, value] pairs.
+ embedding_vec_size: # of float per a embedding vector.
* The host thread will wait for the GPU kernels to complete before returning from the API, thus this API is synchronous with CPU thread. When returned, the initialization process of the cache is already done.
* The embedding cache will be created on the GPU where user call the constructor. Thus, user should set the host thread to the target CUDA device before creating the embedding cache. All resources(i.e. device-side buffers, CUDA streams) used later for this embedding cache should be allocated on the same CUDA device as the embedding cache.
* The constructor can be called only once, thus is not thread-safe.
`Destructor`
* The destructor clean up the embedding cache. This API should be called only once when user need to delete the embedding cache object, thus is not thread-safe.
`Query`
* Search `len` elements from device-side buffers `d_keys` in the cache and return the result in device-side buffer `d_values` if a key is hit in the cache.
* If a key is missing, the missing key and its index in the `d_keys` buffer will be returned in device-side buffers `d_missing_keys` and `d_missing_index`. The # of missing key will be return in device-side buffer `d_missing_len`. For simplicity, these buffers should have the same length as `d_keys` to avoid out-of-bound access.
* The GPU kernels will be launched in `stream` CUDA stream.
* The host thread will return from the API immediately after the kernels are launched, thus this API is Asynchronous with CPU thread.
* The keys to be queried in the `d_keys` buffer can have duplication. In this case, user will get duplicated returned values or missing information.
* This API is thread-safe and can be called concurrently with other APIs.
* For hyper-parameter `task_per_warp_tile`, see `Performance hint` session below.
`Replace`
* The API will replace `len` [key, value] pairs listed in `d_keys` and `d_values` into the embedding cache using the LRU replacement algorithm.
* The GPU kernels will be launched in `stream` CUDA stream.
* The host thread will return from the API immediately after the kernels are launched, thus this API is Asynchronous with CPU thread.
* The keys to be replaced in the `d_keys` buffer can have duplication and can be already stored inside the cache. In these cases, the cache will detect any possible duplication and maintain the uniqueness of all the [key ,value] pairs stored in the cache.
* This API is thread-safe and can be called concurrently with other APIs.
* This API will first try to insert the [key, value] pairs into the cache if there is any empty slot. If the cache is full, it will do the replacement.
* For hyper-parameter `task_per_warp_tile`, see `Performance hint` session below.
`Update`
* The API will search for `len` keys listed in `d_keys` buffer within the cache. If a key is found in the cache, this API will update the value associated with the key to the corresponding values provided in `d_values` buffer. If a key is not found in the cache, this API will do nothing to this key.
* The GPU kernels will be launched in `stream` CUDA stream.
* The host thread will return from the API immediately after the kernels are launched, thus this API is Asynchronous with CPU thread.
* If the keys to be updated in the `d_keys` buffer have duplication, all values associated with this key in the `d_values` buffer will be updated to the cache atomically. The final result depends on the order of updating the value.
* This API is thread-safe and can be called concurrently with other APIs.
* For hyper-parameter `task_per_warp_tile`, see `Performance hint` session below.
`Dump`
* The API will dump all the keys stored in [`start_set_index`, `end_set_index`) cache sets to `d_keys` buffer as a linear array(the key order is not guaranteed). The total # of keys dumped will be reported in `d_dump_counter` variable.
* The GPU kernels will be launched in `stream` CUDA stream.
* The host thread will return from the API immediately after the kernels are launched, thus this API is Asynchronous with CPU thread.
* This API is thread-safe and can be called concurrently with other APIs.
## More Information
* The detailed introduction of the GPU embedding cache data structure is presented at GTC China 2020: https://on-demand-gtc.gputechconf.com/gtcnew/sessionview.php?sessionName=cns20626-%e4%bd%bf%e7%94%a8+gpu+embedding+cache+%e5%8a%a0%e9%80%9f+ctr+%e6%8e%a8%e7%90%86%e8%bf%87%e7%a8%8b
* The `test` folder contains a example of using the GPU embedding cache.
* This project is used by `embedding_cache` class in `HugeCTR/include/inference/embedding_cache.hpp` which can be used as an example.
## Performance Hint
* The hyper-parameter `warp_size` should be keep as 32 by default. When the length for Query or Replace operations is small(~1-50k), user can choose smaller warp_size and increase the total # of cache set(while maintaining the same cache size) to increase the parallelism and improve the performance.
* The hyper-parameter `set_associativity` is critical to performance:
+ If set too small, may cause load imbalance between different cache sets(lower down the effective capacity of the cache, lower down the hit rate). To prevent this, the embedding cache uses a very random hash function to hash the keys to different cache set, thus will achieve load balance statistically. However, larger cache set will tends to have better load balance.
+ If set too large, the searching space for a single key will be very large. The performance of the embedding cache API will drop dramatically. Also, each set will be accessed exclusively, thus the more cache sets the higher parallelism can be achieved.
+ Recommend setting `set_associativity` to 2 or 4.
* The runtime hyper-parameter `task_per_warp_tile` is set to 1 as default parameter, thus users don't need to change their code to accommodate this interface change. This hyper-parameter determines how many keys are been queried/replaced/updated by a single warp tile. The acceptable value is between [1, `warp_size`]. For small to medium size operations to the cache, less task per warp tile can increase the total # of warp tiles running concurrently on the GPU chip, thus can bring significant performance improvement. For large size operations to the cache, the increased # of warp tile will not bring any performance improvement(even a little regression on the performance, ~5%). User can choose the value for this parameter based on the value of `len` parameter.
* The GPU is designed for optimizing throughput. Always try to batch up the inference task and try to have larger `query_size`.
* As the APIs of the embedding cache is asynchronous with host threads. Try to optimize the E2E inference pipeline by overlapping asynchronous tasks on GPU or between CPU and GPU. For example, after retrieving the missing values from the parameter server, user can combine the missing values with the hit values and do the rest of inference pipeline at the same time with the `Replace` API. Replacement is not necessarily happens together with Query all the time, user can do query multiple times then do a replacement if the hit rate is acceptable.
* Try different cache capacity and evaluate the hit rate. If the capacity of embedding cache can be larger than actual embedding footprint, the hit rate can be as high as 99%+.
/*
* Copyright (c) 2021, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <nv_util.h>
#define TASK_PER_WARP_TILE_MACRO 1
namespace gpu_cache {
///////////////////////////////////////////////////////////////////////////////////////////////////
// GPU Cache API
template <typename key_type>
class gpu_cache_api {
public:
virtual ~gpu_cache_api() noexcept(false) {}
// Query API, i.e. A single read from the cache
virtual void Query(const key_type* d_keys, const size_t len, float* d_values,
uint64_t* d_missing_index, key_type* d_missing_keys, size_t* d_missing_len,
cudaStream_t stream,
const size_t task_per_warp_tile = TASK_PER_WARP_TILE_MACRO) = 0;
// Replace API, i.e. Follow the Query API to update the content of the cache to Most Recent
virtual void Replace(const key_type* d_keys, const size_t len, const float* d_values,
cudaStream_t stream,
const size_t task_per_warp_tile = TASK_PER_WARP_TILE_MACRO) = 0;
// Update API, i.e. update the embeddings which exist in the cache
virtual void Update(const key_type* d_keys, const size_t len, const float* d_values,
cudaStream_t stream,
const size_t task_per_warp_tile = TASK_PER_WARP_TILE_MACRO) = 0;
// Dump API, i.e. dump some slabsets' keys from the cache
virtual void Dump(key_type* d_keys, size_t* d_dump_counter, const size_t start_set_index,
const size_t end_set_index, cudaStream_t stream) = 0;
};
} // namespace gpu_cache
/*
* Copyright (c) 2023, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
// MurmurHash3_32 implementation from
// https://github.com/aappleby/smhasher/blob/master/src/MurmurHash3.cpp
//-----------------------------------------------------------------------------
// MurmurHash3 was written by Austin Appleby, and is placed in the public
// domain. The author hereby disclaims copyright to this source code.
// Note - The x86 and x64 versions do _not_ produce the same results, as the
// algorithms are optimized for their respective platforms. You can still
// compile and run any of them on any platform, but your performance with the
// non-native version will be less than optimal.
template <typename Key, uint32_t m_seed = 0>
struct MurmurHash3_32 {
using argument_type = Key;
using result_type = uint32_t;
/*__forceinline__
__host__ __device__
MurmurHash3_32() : m_seed( 0 ) {}*/
__forceinline__ __host__ __device__ static uint32_t rotl32(uint32_t x, int8_t r) {
return (x << r) | (x >> (32 - r));
}
__forceinline__ __host__ __device__ static uint32_t fmix32(uint32_t h) {
h ^= h >> 16;
h *= 0x85ebca6b;
h ^= h >> 13;
h *= 0xc2b2ae35;
h ^= h >> 16;
return h;
}
/* --------------------------------------------------------------------------*/
/**
* @Synopsis Combines two hash values into a new single hash value. Called
* repeatedly to create a hash value from several variables.
* Taken from the Boost hash_combine function
* https://www.boost.org/doc/libs/1_35_0/doc/html/boost/hash_combine_id241013.html
*
* @Param lhs The first hash value to combine
* @Param rhs The second hash value to combine
*
* @Returns A hash value that intelligently combines the lhs and rhs hash values
*/
/* ----------------------------------------------------------------------------*/
__host__ __device__ static result_type hash_combine(result_type lhs, result_type rhs) {
result_type combined{lhs};
combined ^= rhs + 0x9e3779b9 + (combined << 6) + (combined >> 2);
return combined;
}
__forceinline__ __host__ __device__ static result_type hash(const Key& key) {
constexpr int len = sizeof(argument_type);
const uint8_t* const data = (const uint8_t*)&key;
constexpr int nblocks = len / 4;
uint32_t h1 = m_seed;
constexpr uint32_t c1 = 0xcc9e2d51;
constexpr uint32_t c2 = 0x1b873593;
//----------
// body
const uint32_t* const blocks = (const uint32_t*)(data + nblocks * 4);
for (int i = -nblocks; i; i++) {
uint32_t k1 = blocks[i]; // getblock32(blocks,i);
k1 *= c1;
k1 = rotl32(k1, 15);
k1 *= c2;
h1 ^= k1;
h1 = rotl32(h1, 13);
h1 = h1 * 5 + 0xe6546b64;
}
//----------
// tail
const uint8_t* tail = (const uint8_t*)(data + nblocks * 4);
uint32_t k1 = 0;
switch (len & 3) {
case 3:
k1 ^= tail[2] << 16;
case 2:
k1 ^= tail[1] << 8;
case 1:
k1 ^= tail[0];
k1 *= c1;
k1 = rotl32(k1, 15);
k1 *= c2;
h1 ^= k1;
};
//----------
// finalization
h1 ^= len;
h1 = fmix32(h1);
return h1;
}
__host__ __device__ __forceinline__ result_type operator()(const Key& key) const {
return this->hash(key);
}
};
template <typename key_type, typename index_type, index_type result>
struct Fix_Hash {
using result_type = index_type;
__forceinline__ __host__ __device__ static index_type hash(const key_type& key) { return result; }
};
template <typename key_type, typename result_type>
struct Mod_Hash {
__forceinline__ __host__ __device__ static result_type hash(const key_type& key) {
return (result_type)key;
}
};
/*
* Copyright (c) 2023, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <nv_util.h>
#include <cstdio>
#include <hash_functions.cuh>
#include <limits>
#include "gpu_cache_api.hpp"
#ifdef LIBCUDACXX_VERSION
#include <cuda/std/atomic>
#include <cuda/std/semaphore>
#endif
#define SET_ASSOCIATIVITY 2
#define SLAB_SIZE 32
#define TASK_PER_WARP_TILE_MACRO 1
namespace gpu_cache {
// slab for static slab list
template <typename key_type, int warp_size>
struct static_slab {
key_type slab_[warp_size];
};
// Static slablist(slabset) for GPU Cache
template <int set_associativity, typename key_type, int warp_size>
struct slab_set {
static_slab<key_type, warp_size> set_[set_associativity];
};
///////////////////////////////////////////////////////////////////////////////////////////////////
// GPU Cache
template <typename key_type, typename ref_counter_type, key_type empty_key, int set_associativity,
int warp_size, typename set_hasher = MurmurHash3_32<key_type>,
typename slab_hasher = Mod_Hash<key_type, size_t>>
class gpu_cache : public gpu_cache_api<key_type> {
public:
// Ctor
gpu_cache(const size_t capacity_in_set, const size_t embedding_vec_size);
// Dtor
~gpu_cache();
// Query API, i.e. A single read from the cache
void Query(const key_type* d_keys, const size_t len, float* d_values, uint64_t* d_missing_index,
key_type* d_missing_keys, size_t* d_missing_len, cudaStream_t stream,
const size_t task_per_warp_tile = TASK_PER_WARP_TILE_MACRO) override;
// Replace API, i.e. Follow the Query API to update the content of the cache to Most Recent
void Replace(const key_type* d_keys, const size_t len, const float* d_values, cudaStream_t stream,
const size_t task_per_warp_tile = TASK_PER_WARP_TILE_MACRO) override;
// Update API, i.e. update the embeddings which exist in the cache
void Update(const key_type* d_keys, const size_t len, const float* d_values, cudaStream_t stream,
const size_t task_per_warp_tile = TASK_PER_WARP_TILE_MACRO) override;
// Dump API, i.e. dump some slabsets' keys from the cache
void Dump(key_type* d_keys, size_t* d_dump_counter, const size_t start_set_index,
const size_t end_set_index, cudaStream_t stream) override;
public:
using slabset = slab_set<set_associativity, key_type, warp_size>;
#ifdef LIBCUDACXX_VERSION
using atomic_ref_counter_type = cuda::atomic<ref_counter_type, cuda::thread_scope_device>;
using mutex = cuda::binary_semaphore<cuda::thread_scope_device>;
#endif
private:
static const size_t BLOCK_SIZE_ = 64;
// Cache data
slabset* keys_;
float* vals_;
ref_counter_type* slot_counter_;
// Global counter
#ifdef LIBCUDACXX_VERSION
atomic_ref_counter_type* global_counter_;
#else
ref_counter_type* global_counter_;
#endif
// CUDA device
int dev_;
// Cache capacity
size_t capacity_in_set_;
size_t num_slot_;
// Embedding vector size
size_t embedding_vec_size_;
#ifdef LIBCUDACXX_VERSION
// Array of mutex to protect (sub-)warp-level data structure, each mutex protect 1 slab set
mutex* set_mutex_;
#else
// Array of flag to protect (sub-)warp-level data structure, each flag act as a mutex and protect
// 1 slab set 1 for unlock, 0 for lock
int* set_mutex_;
#endif
};
} // namespace gpu_cache
/*
* Copyright (c) 2023, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <cuda_runtime_api.h>
#include <stdexcept>
#include <string>
#define CUDA_CHECK(val) \
{ nv::cuda_check_((val), __FILE__, __LINE__); }
namespace nv {
class CudaException : public std::runtime_error {
public:
CudaException(const std::string& what) : runtime_error(what) {}
};
inline void cuda_check_(cudaError_t val, const char* file, int line) {
if (val != cudaSuccess) {
throw CudaException(std::string(file) + ":" + std::to_string(line) + ": CUDA error " +
std::to_string(val) + ": " + cudaGetErrorString(val));
}
}
class CudaDeviceRestorer {
public:
CudaDeviceRestorer() { CUDA_CHECK(cudaGetDevice(&dev_)); }
~CudaDeviceRestorer() { CUDA_CHECK(cudaSetDevice(dev_)); }
void check_device(int device) const {
if (device != dev_) {
throw std::runtime_error(
std::string(__FILE__) + ":" + std::to_string(__LINE__) +
": Runtime Error: The device id in the context is not consistent with configuration");
}
}
private:
int dev_;
};
inline int get_dev(const void* ptr) {
cudaPointerAttributes attr;
CUDA_CHECK(cudaPointerGetAttributes(&attr, ptr));
int dev = -1;
#if CUDART_VERSION >= 10000
if (attr.type == cudaMemoryTypeDevice)
#else
if (attr.memoryType == cudaMemoryTypeDevice)
#endif
{
dev = attr.device;
}
return dev;
}
inline void switch_to_dev(const void* ptr) {
int dev = get_dev(ptr);
if (dev >= 0) {
CUDA_CHECK(cudaSetDevice(dev));
}
}
} // namespace nv
/*
* Copyright (c) 2023, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <nv_util.h>
#include <hash_functions.cuh>
namespace gpu_cache {
template <typename key_type, typename value_type, unsigned int tile_size = 4,
unsigned int group_size = 16, typename hasher = MurmurHash3_32<key_type>>
class StaticHashTable {
public:
using size_type = uint32_t;
static_assert(sizeof(key_type) <= 8, "sizeof(key_type) cannot be larger than 8 bytes");
static_assert(sizeof(key_type) >= sizeof(size_type),
"sizeof(key_type) cannot be smaller than sizeof(size_type)");
static_assert((group_size & (group_size - 1)) == 0, "group_size must be a power of 2");
static_assert(group_size > 1, "group_size must be larger than 1");
// User can use empty_key as input without affecting correctness,
// since we will handle it inside kernel.
constexpr static key_type empty_key = ~(key_type)0;
constexpr static size_type invalid_slot = ~(size_type)0;
public:
StaticHashTable(size_type capacity, int value_dim = 1, hasher hash = hasher{});
~StaticHashTable();
inline size_type size() const { return size_; }
inline size_type capacity() const { return value_capacity_; }
inline size_type key_capacity() const { return key_capacity_; }
inline size_t memory_usage() const {
size_t keys_bytes = sizeof(key_type) * (key_capacity_ + 1);
size_t indices_bytes = sizeof(size_type) * (key_capacity_ + 1);
size_t values_bytes = sizeof(value_type) * value_capacity_ * value_dim_;
return keys_bytes + indices_bytes + values_bytes;
}
void clear(cudaStream_t stream = 0);
// Note:
// 1. Please make sure the key to be inserted is not duplicated.
// 2. Please make sure the key to be inserted does not exist in the table.
// 3. Please make sure (size() + num_keys) <= capacity().
void insert(const key_type *keys, const value_type *values, size_type num_keys,
cudaStream_t stream = 0);
void lookup(const key_type *keys, value_type *values, int num_keys, value_type default_value = 0,
cudaStream_t stream = 0);
private:
key_type *table_keys_;
size_type *table_indices_;
size_type key_capacity_;
value_type *table_values_;
size_type value_capacity_;
int value_dim_;
size_type size_;
hasher hash_;
};
} // namespace gpu_cache
\ No newline at end of file
/*
* Copyright (c) 2023, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <nv_util.h>
#include <cstdio>
#include <limits>
#include <static_hash_table.hpp>
namespace gpu_cache {
///////////////////////////////////////////////////////////////////////////////////////////////////
template <typename key_type>
class static_table {
public:
// Ctor
static_table(const size_t table_size, const size_t embedding_vec_size,
const float default_value = 0);
// Dtor
~static_table(){};
// Query API, i.e. A single read from the cache
void Query(const key_type* d_keys, const size_t len, float* d_values, cudaStream_t stream);
// Replace API, i.e. Follow the Query API to update the content of the cache to Most Recent
void Init(const key_type* d_keys, const size_t len, const float* d_values, cudaStream_t stream);
void Clear(cudaStream_t stream);
private:
StaticHashTable<key_type, float> static_hash_table_;
// Embedding vector size
size_t embedding_vec_size_;
size_t table_size_;
float default_value_;
};
} // namespace gpu_cache
/*
* Copyright (c) 2023, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <nv_util.h>
#include <thread>
#include <unordered_map>
#include <vector>
namespace gpu_cache {
template <typename key_type, typename index_type>
class HashBlock {
public:
key_type* keys;
size_t num_sets;
size_t capacity;
HashBlock(size_t expected_capacity, int set_size, int batch_size);
~HashBlock();
void add(const key_type* new_keys, const size_t num_keys, key_type* missing_keys,
int* num_missing_keys, cudaStream_t stream);
void query(const key_type* query_keys, const size_t num_keys, index_type* output_indices,
key_type* missing_keys, int* missing_positions, int* num_missing_keys,
cudaStream_t stream);
void query(const key_type* query_keys, int* num_keys, index_type* output_indices,
cudaStream_t stream);
void clear(cudaStream_t stream);
private:
int max_set_size_;
int batch_size_;
int* set_sizes_;
};
template <typename vec_type>
class H2HCopy {
public:
H2HCopy(int num_threads) : num_threads_(num_threads), working_(num_threads) {
for (int i = 0; i < num_threads_; i++) {
threads_.emplace_back(
[&](int idx) {
while (!terminate_) {
if (working_[idx].load(std::memory_order_relaxed)) {
working_[idx].store(false, std::memory_order_relaxed);
if (num_keys_ == 0) continue;
size_t num_keys_this_thread = (num_keys_ - 1) / num_threads_ + 1;
size_t begin = idx * num_keys_this_thread;
if (idx == num_threads_ - 1) {
num_keys_this_thread = num_keys_ - num_keys_this_thread * idx;
}
size_t end = begin + num_keys_this_thread;
for (size_t i = begin; i < end; i++) {
size_t idx_vec = get_index_(i);
if (idx_vec == std::numeric_limits<size_t>::max()) {
continue;
}
memcpy(dst_data_ptr_ + i * vec_size_, src_data_ptr_ + idx_vec * vec_size_,
sizeof(vec_type) * vec_size_);
}
num_finished_workers_++;
}
}
std::this_thread::sleep_for(std::chrono::microseconds(1));
},
i);
}
};
void copy(vec_type* dst_data_ptr, vec_type* src_data_ptr, size_t num_keys, int vec_size,
std::function<size_t(size_t)> get_index_func) {
std::lock_guard<std::mutex> guard(submit_mutex_);
dst_data_ptr_ = dst_data_ptr;
src_data_ptr_ = src_data_ptr;
get_index_ = get_index_func;
num_keys_ = num_keys;
vec_size_ = vec_size;
num_finished_workers_.store(0, std::memory_order_acquire);
for (auto& working : working_) {
working.store(true, std::memory_order_relaxed);
}
while (num_finished_workers_ != num_threads_) {
continue;
}
}
~H2HCopy() {
terminate_ = true;
for (auto& t : threads_) {
t.join();
}
}
private:
vec_type* src_data_ptr_;
vec_type* dst_data_ptr_;
std::function<size_t(size_t)> get_index_;
size_t num_keys_;
int vec_size_;
std::mutex submit_mutex_;
const int num_threads_;
std::vector<std::thread> threads_;
std::vector<std::atomic<bool>> working_;
volatile bool terminate_{false};
std::atomic<int> num_finished_workers_{0};
};
template <typename key_type, typename index_type, typename vec_type = float>
class UvmTable {
public:
UvmTable(const size_t device_table_capacity, const size_t host_table_capacity,
const int max_batch_size, const int vec_size,
const vec_type default_value = (vec_type)0);
~UvmTable();
void query(const key_type* d_keys, const int len, vec_type* d_vectors, cudaStream_t stream = 0);
void add(const key_type* h_keys, const vec_type* h_vectors, const size_t len);
void clear(cudaStream_t stream = 0);
private:
static constexpr int num_buffers_ = 2;
key_type* d_keys_buffer_;
vec_type* d_vectors_buffer_;
vec_type* d_vectors_;
index_type* d_output_indices_;
index_type* d_output_host_indices_;
index_type* h_output_host_indices_;
key_type* d_missing_keys_;
int* d_missing_positions_;
int* d_missing_count_;
std::vector<vec_type> h_vectors_;
key_type* h_missing_keys_;
cudaStream_t query_stream_;
cudaEvent_t query_event_;
vec_type* h_cpy_buffers_[num_buffers_];
vec_type* d_cpy_buffers_[num_buffers_];
cudaStream_t cpy_streams_[num_buffers_];
cudaEvent_t cpy_events_[num_buffers_];
std::unordered_map<key_type, index_type> h_final_missing_items_;
int max_batch_size_;
int vec_size_;
size_t num_set_;
size_t num_host_set_;
size_t table_capacity_;
std::vector<vec_type> default_vector_;
HashBlock<key_type, index_type> device_table_;
HashBlock<key_type, index_type> host_table_;
};
} // namespace gpu_cache
\ No newline at end of file
#
# Copyright (c) 2023, NVIDIA CORPORATION.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
cmake_minimum_required(VERSION 3.8)
file(GLOB gpu_cache_src
nv_gpu_cache.cu
static_table.cu
static_hash_table.cu
uvm_table.cu
)
add_library(gpu_cache SHARED ${gpu_cache_src})
target_compile_features(gpu_cache PUBLIC cxx_std_11)
set_target_properties(gpu_cache PROPERTIES CUDA_RESOLVE_DEVICE_SYMBOLS ON)
set_target_properties(gpu_cache PROPERTIES CUDA_RESOLVE_DEVICE_SYMBOLS ON)
set_target_properties(gpu_cache PROPERTIES CUDA_ARCHITECTURES OFF)
This diff is collapsed.
/*
* Copyright (c) 2023, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <cooperative_groups.h>
#include <cuda.h>
#include <stdint.h>
#include <stdio.h>
#include <static_hash_table.hpp>
namespace gpu_cache {
template <typename T>
__device__ __forceinline__ T atomicCASHelper(T *address, T compare, T val) {
return atomicCAS(address, compare, val);
}
template <>
__device__ __forceinline__ long long atomicCASHelper(long long *address, long long compare,
long long val) {
return (long long)atomicCAS((unsigned long long *)address, (unsigned long long)compare,
(unsigned long long)val);
}
template <>
__device__ __forceinline__ int64_t atomicCASHelper(int64_t *address, int64_t compare, int64_t val) {
return (int64_t)atomicCAS((unsigned long long *)address, (unsigned long long)compare,
(unsigned long long)val);
}
template <unsigned int group_size, typename key_type, typename size_type, typename hasher,
typename CG>
__device__ size_type insert(key_type *table, size_type capacity, key_type key, const hasher &hash,
const CG &cg, const key_type empty_key, const size_type invalid_slot) {
// If insert successfully, return its position in the table,
// otherwise return invalid_slot.
const size_type num_groups = capacity / group_size;
#if (CUDA_VERSION < 11060)
unsigned long long num_threads_per_group = cg.size();
#else
unsigned long long num_threads_per_group = cg.num_threads();
#endif
const unsigned int num_tiles_per_group = group_size / num_threads_per_group;
// Assuming capacity is a power of 2
size_type slot = hash(key) & (capacity - 1);
slot = slot - (slot & (size_type)(group_size - 1)) + cg.thread_rank();
for (size_type step = 0; step < num_groups; ++step) {
for (unsigned int i = 0; i < num_tiles_per_group; ++i) {
key_type existed_key = table[slot];
// Check if key already exists
bool existed = cg.any(existed_key == key);
if (existed) {
return invalid_slot;
}
// Try to insert the target key into empty slot
while (true) {
int can_insert = cg.ballot(existed_key == empty_key);
if (!can_insert) {
break;
}
bool succeed = false;
int src_lane = __ffs(can_insert) - 1;
if (cg.thread_rank() == src_lane) {
key_type old = atomicCASHelper(table + slot, empty_key, key);
if (old == empty_key) {
// Insert key successfully
succeed = true;
} else if (old == key) {
// The target key was inserted by another thread
succeed = true;
slot = invalid_slot;
} else {
// The empty slot was occupied by another key,
// update the existed_key for next loop.
existed_key = old;
}
}
succeed = cg.shfl(succeed, src_lane);
if (succeed) {
slot = cg.shfl(slot, src_lane);
return slot;
}
}
slot += num_threads_per_group;
}
slot = (slot + group_size * step) & (capacity - 1);
}
return invalid_slot;
}
template <unsigned int tile_size, unsigned int group_size, typename key_type, typename size_type,
typename hasher>
__global__ void InsertKeyKernel(key_type *table_keys, size_type *table_indices, size_type capacity,
const key_type *keys, size_type num_keys, size_type offset,
hasher hash, const key_type empty_key,
const size_type invalid_slot) {
static_assert(tile_size <= group_size, "tile_size cannot be larger than group_size");
auto block = cooperative_groups::this_thread_block();
auto tile = cooperative_groups::tiled_partition<tile_size>(block);
int tile_idx = tile.meta_group_size() * block.group_index().x + tile.meta_group_rank();
int tile_cnt = tile.meta_group_size() * gridDim.x;
for (size_type i = tile_idx; i < num_keys; i += tile_cnt) {
key_type key = keys[i];
if (key == empty_key) {
if (tile.thread_rank() == 0 && table_keys[capacity] != empty_key) {
table_keys[capacity] = empty_key;
table_indices[capacity] = i + offset;
}
continue;
}
size_type slot =
insert<group_size>(table_keys, capacity, key, hash, tile, empty_key, invalid_slot);
if (tile.thread_rank() == 0 && slot != invalid_slot) {
table_indices[slot] = i + offset;
}
}
}
template <unsigned int group_size, typename key_type, typename size_type, typename hasher,
typename CG>
__device__ size_type lookup(key_type *table, size_type capacity, key_type key, const hasher &hash,
const CG &cg, const key_type empty_key, const size_type invalid_slot) {
// If lookup successfully, return the target key's position in the table,
// otherwise return invalid_slot.
const size_type num_groups = capacity / group_size;
#if (CUDA_VERSION < 11060)
unsigned long long num_threads_per_group = cg.size();
#else
unsigned long long num_threads_per_group = cg.num_threads();
#endif
const unsigned int num_tiles_per_group = group_size / num_threads_per_group;
// Assuming capacity is a power of 2
size_type slot = hash(key) & (capacity - 1);
slot = slot - (slot & (size_type)(group_size - 1)) + cg.thread_rank();
for (size_type step = 0; step < num_groups; ++step) {
for (unsigned int i = 0; i < num_tiles_per_group; ++i) {
key_type existed_key = table[slot];
// Check if key exists
int existed = cg.ballot(existed_key == key);
if (existed) {
int src_lane = __ffs(existed) - 1;
slot = cg.shfl(slot, src_lane);
return slot;
}
// The target key doesn't exist
bool contain_empty = cg.any(existed_key == empty_key);
if (contain_empty) {
return invalid_slot;
}
slot += num_threads_per_group;
}
slot = (slot + group_size * step) & (capacity - 1);
}
return invalid_slot;
}
template <int warp_size>
__forceinline__ __device__ void warp_tile_copy(const size_t lane_idx,
const size_t emb_vec_size_in_float,
volatile float *d_dst, const float *d_src) {
// 16 bytes align
if (emb_vec_size_in_float % 4 != 0 || (size_t)d_dst % 16 != 0 || (size_t)d_src % 16 != 0) {
#pragma unroll
for (size_t i = lane_idx; i < emb_vec_size_in_float; i += warp_size) {
d_dst[i] = d_src[i];
}
} else {
#pragma unroll
for (size_t i = lane_idx; i < emb_vec_size_in_float / 4; i += warp_size) {
*(float4 *)(d_dst + i * 4) = __ldg((const float4 *)(d_src + i * 4));
}
}
}
template <int warp_size>
__forceinline__ __device__ void warp_tile_copy(const size_t lane_idx,
const size_t emb_vec_size_in_float,
volatile float *d_dst, const float default_value) {
#pragma unroll
for (size_t i = lane_idx; i < emb_vec_size_in_float; i += warp_size) {
d_dst[i] = default_value;
}
}
template <unsigned int tile_size, unsigned int group_size, typename key_type, typename value_type,
typename size_type, typename hasher>
__global__ void LookupKernel(key_type *table_keys, size_type *table_indices, size_type capacity,
const key_type *keys, int num_keys, const value_type *values,
int value_dim, value_type *output, hasher hash,
const key_type empty_key, const value_type default_value,
const size_type invalid_slot) {
static_assert(tile_size <= group_size, "tile_size cannot be larger than group_size");
constexpr int WARP_SIZE = 32;
static_assert(WARP_SIZE % tile_size == 0, "tile_size must be divisible by warp_size");
auto grid = cooperative_groups::this_grid();
auto block = cooperative_groups::this_thread_block();
auto tile = cooperative_groups::tiled_partition<tile_size>(block);
auto warp_tile = cooperative_groups::tiled_partition<WARP_SIZE>(block);
int tile_idx = tile.meta_group_size() * block.group_index().x + tile.meta_group_rank();
int tile_cnt = tile.meta_group_size() * gridDim.x;
for (int it = 0; it < (num_keys - 1) / tile_cnt + 1; it++) {
size_type slot = invalid_slot;
int key_num = it * tile_cnt + tile_idx;
if (key_num < num_keys) {
key_type key = keys[key_num];
if (key == empty_key) {
if (tile.thread_rank() == 0 && table_keys[capacity] == key) {
slot = capacity;
}
} else {
slot = lookup<group_size>(table_keys, capacity, key, hash, tile, empty_key, invalid_slot);
}
}
for (int i = 0; i < WARP_SIZE / tile_size; i++) {
auto slot_to_read = warp_tile.shfl(slot, i * tile_size);
int idx_to_write = warp_tile.shfl(key_num, 0) + i;
if (idx_to_write >= num_keys) break;
if (slot_to_read == invalid_slot) {
warp_tile_copy<WARP_SIZE>(warp_tile.thread_rank(), value_dim,
output + (size_t)value_dim * idx_to_write, default_value);
continue;
}
auto index = table_indices[slot_to_read];
warp_tile_copy<WARP_SIZE>(warp_tile.thread_rank(), value_dim,
output + (size_t)value_dim * idx_to_write,
values + (size_t)value_dim * index);
}
}
}
template <typename key_type, typename value_type, unsigned int tile_size, unsigned int group_size,
typename hasher>
StaticHashTable<key_type, value_type, tile_size, group_size, hasher>::StaticHashTable(
size_type capacity, int value_dim, hasher hash)
: table_keys_(nullptr),
table_indices_(nullptr),
key_capacity_(capacity * 2),
table_values_(nullptr),
value_capacity_(capacity),
value_dim_(value_dim),
size_(0),
hash_(hash) {
// Check parameters
if (capacity <= 0) {
printf("Error: capacity must be larger than 0\n");
exit(EXIT_FAILURE);
}
if (value_dim <= 0) {
printf("Error: value_dim must be larger than 0\n");
exit(EXIT_FAILURE);
}
// Make key_capacity_ be a power of 2
size_t new_capacity = group_size;
while (new_capacity < key_capacity_) {
new_capacity *= 2;
}
key_capacity_ = new_capacity;
// Allocate device memory
size_t align_m = 16;
size_t num_keys = key_capacity_ + 1;
size_t num_values = (value_capacity_ * value_dim_ + align_m - 1) / align_m * align_m;
CUDA_CHECK(cudaMalloc(&table_keys_, sizeof(key_type) * num_keys));
CUDA_CHECK(cudaMalloc(&table_indices_, sizeof(size_type) * num_keys));
CUDA_CHECK(cudaMalloc(&table_values_, sizeof(value_type) * num_values));
// Initialize table_keys_
CUDA_CHECK(cudaMemset(table_keys_, 0xff, sizeof(key_type) * key_capacity_));
CUDA_CHECK(cudaMemset(table_keys_ + key_capacity_, 0, sizeof(key_type)));
}
template <typename key_type, typename value_type, unsigned int tile_size, unsigned int group_size,
typename hasher>
void StaticHashTable<key_type, value_type, tile_size, group_size, hasher>::insert(
const key_type *keys, const value_type *values, size_type num_keys, cudaStream_t stream) {
if (num_keys == 0) {
return;
}
if (num_keys <= 0 || (size() + num_keys) > capacity()) {
printf("Error: Invalid num_keys to insert\n");
exit(EXIT_FAILURE);
}
// Insert keys
constexpr int block = 256;
int grid = (num_keys - 1) / block + 1;
InsertKeyKernel<tile_size, group_size>
<<<grid, block, 0, stream>>>(table_keys_, table_indices_, key_capacity_, keys, num_keys,
size_, hash_, empty_key, invalid_slot);
// Copy values
CUDA_CHECK(cudaMemcpyAsync(table_values_ + size_ * value_dim_, values,
sizeof(value_type) * num_keys * value_dim_, cudaMemcpyDeviceToDevice,
stream));
size_ += num_keys;
}
template <typename key_type, typename value_type, unsigned int tile_size, unsigned int group_size,
typename hasher>
void StaticHashTable<key_type, value_type, tile_size, group_size, hasher>::clear(
cudaStream_t stream) {
CUDA_CHECK(cudaMemsetAsync(table_keys_, 0xff, sizeof(key_type) * key_capacity_, stream));
CUDA_CHECK(cudaMemsetAsync(table_keys_ + key_capacity_, 0, sizeof(key_type), stream));
size_ = 0;
}
template <typename key_type, typename value_type, unsigned int tile_size, unsigned int group_size,
typename hasher>
StaticHashTable<key_type, value_type, tile_size, group_size, hasher>::~StaticHashTable() {
CUDA_CHECK(cudaFree(table_keys_));
CUDA_CHECK(cudaFree(table_indices_));
CUDA_CHECK(cudaFree(table_values_));
}
template <typename key_type, typename value_type, unsigned int tile_size, unsigned int group_size,
typename hasher>
void StaticHashTable<key_type, value_type, tile_size, group_size, hasher>::lookup(
const key_type *keys, value_type *values, int num_keys, value_type default_value,
cudaStream_t stream) {
if (num_keys == 0) {
return;
}
constexpr int block = 256;
const int grid = (num_keys - 1) / block + 1;
// Lookup keys
LookupKernel<tile_size, group_size><<<grid, block, 0, stream>>>(
table_keys_, table_indices_, key_capacity_, keys, num_keys, table_values_, value_dim_, values,
hash_, empty_key, default_value, invalid_slot);
}
template class StaticHashTable<long long, float>;
template class StaticHashTable<uint32_t, float>;
} // namespace gpu_cache
\ No newline at end of file
/*
* Copyright (c) 2023, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <cooperative_groups.h>
#include <nv_util.h>
#include <iostream>
#include <static_hash_table.hpp>
#include <static_table.hpp>
namespace gpu_cache {
template <typename key_type>
static_table<key_type>::static_table(const size_t table_size, const size_t embedding_vec_size,
const float default_value)
: table_size_(table_size),
embedding_vec_size_(embedding_vec_size),
default_value_(default_value),
static_hash_table_(table_size, embedding_vec_size) {
if (embedding_vec_size_ == 0) {
printf("Error: Invalid value for embedding_vec_size.\n");
return;
}
}
template <typename key_type>
void static_table<key_type>::Query(const key_type* d_keys, const size_t len, float* d_values,
cudaStream_t stream) {
static_hash_table_.lookup(d_keys, d_values, len, default_value_, stream);
}
template <typename key_type>
void static_table<key_type>::Init(const key_type* d_keys, const size_t len, const float* d_values,
cudaStream_t stream) {
static_hash_table_.insert(d_keys, d_values, len, stream);
}
template <typename key_type>
void static_table<key_type>::Clear(cudaStream_t stream) {
static_hash_table_.clear(stream);
}
template class static_table<unsigned int>;
template class static_table<long long>;
} // namespace gpu_cache
This diff is collapsed.
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment