Unverified Commit a41d2163 authored by wang jiahao's avatar wang jiahao Committed by GitHub
Browse files

Merge pull request #1013 from kvcache-ai/work-concurrent

In v0.2.4 version, we’ve added highly desired multi-concurrency support to the community through a major refactor of the whole architecture.
parents f142f4df 4ed9744e
#include <atomic>
#include <future>
#include <iostream>
#include <memory>
#include <thread>
#include <vector>
template <typename T>
class MPSCQueue {
struct Node {
std::shared_ptr<T> data;
std::atomic<Node*> next;
Node() : next(nullptr) {}
Node(std::shared_ptr<T> data_) : data(std::move(data_)), next(nullptr) {}
};
std::atomic<Node*> head;
Node* tail;
public:
std::atomic_size_t enqueue_count = 0;
size_t dequeue_count = 0;
MPSCQueue() {
Node* dummy = new Node();
head.store(dummy, std::memory_order_relaxed);
tail = dummy;
}
~MPSCQueue() {
// 清理剩余的节点
Node* node = tail;
while (node) {
Node* next = node->next.load(std::memory_order_relaxed);
delete node;
node = next;
}
}
// 生产者调用
void enqueue(std::shared_ptr<T> data) {
enqueue_count.fetch_add(1);
Node* node = new Node(std::move(data));
Node* prev_head = head.exchange(node, std::memory_order_acq_rel);
prev_head->next.store(node, std::memory_order_release);
}
// 消费者调用
std::shared_ptr<T> dequeue() {
Node* next = tail->next.load(std::memory_order_acquire);
if (next) {
std::shared_ptr<T> res = std::move(next->data);
delete tail;
tail = next;
dequeue_count += 1;
return res;
}
return nullptr;
}
};
\ No newline at end of file
#include <atomic>
#include <cassert>
#include <iostream>
#include <optional>
#include <semaphore>
template <typename T>
class MPSCQueue {
struct Node {
T data;
std::atomic<Node*> next;
Node() : next(nullptr) {}
Node(T data_) : data(std::move(data_)), next(nullptr) {}
};
std::atomic<Node*> head;
Node* tail;
public:
std::atomic_size_t enqueue_count = 0;
size_t dequeue_count = 0;
MPSCQueue() {
Node* dummy = new Node();
head.store(dummy, std::memory_order_seq_cst);
tail = dummy;
}
~MPSCQueue() {
Node* node = tail;
while (node) {
Node* next = node->next.load(std::memory_order_seq_cst);
delete node;
node = next;
}
}
// 生产者调用
void enqueue(T data) {
enqueue_count.fetch_add(1);
Node* node = new Node(std::move(data));
Node* prev_head = head.exchange(node, std::memory_order_seq_cst);
prev_head->next.store(node, std::memory_order_seq_cst);
}
// 消费者调用
std::optional<T> dequeue() {
Node* next = tail->next.load(std::memory_order_seq_cst);
if (next) {
T res = std::move(next->data);
delete tail;
tail = next;
dequeue_count += 1;
return res;
}
return std::nullopt;
}
size_t size() { return enqueue_count.load() - dequeue_count; }
};
template <typename T>
class MPSCQueueConsumerLock {
MPSCQueue<T> queue;
std::counting_semaphore<> sema{0};
public:
void enqueue(T data) {
queue.enqueue(std::move(data));
// std::atomic_thread_fence(std::memory_order_seq_cst);// Inserting this because the memory order might be wrong, I
// am also not that sure about this.
sema.release();
}
T dequeue() {
auto re = queue.dequeue();
if (re.has_value()) {
while (sema.try_acquire() == false) {
std::cerr << __FILE__ << ":" << __FUNCTION__ << " sema try acquire should be success, retrying, please check"
<< std::endl;
// assert(false);
}
return re.value();
}
sema.acquire();
return queue.dequeue().value();
}
size_t size() { return queue.size(); }
};
#ifndef __MUTEX_EXTEND_HPP_
#define __MUTEX_EXTEND_HPP_
#include <atomic>
#include <chrono>
#include <iostream>
#include <thread>
class non_recursive_mutex {
public:
non_recursive_mutex() = default;
// 使用 try_lock 实现非递归锁
bool try_lock() {
std::thread::id this_id = std::this_thread::get_id();
// 检查当前线程是否已经持有该锁
if (owner.load(std::memory_order_acquire) == this_id) {
return false; // 如果是当前线程,返回失败
}
// 尝试加锁
if (mtx.try_lock()) {
owner.store(this_id, std::memory_order_release); // 设置锁的拥有者
return true;
}
return false;
}
// lock 会阻塞,直到获得锁
void lock() {
std::thread::id this_id = std::this_thread::get_id();
while (true) {
// 检查当前线程是否已经持有该锁
if (owner.load(std::memory_order_acquire) == this_id) {
throw std::runtime_error("Thread is trying to lock a mutex it already holds");
}
// 尝试加锁
if (mtx.try_lock()) {
owner.store(this_id, std::memory_order_release); // 设置锁的拥有者
return;
}
// 如果锁未获得,则稍微等待,防止忙等
std::this_thread::yield();
}
}
// 解锁
void unlock() {
std::thread::id this_id = std::this_thread::get_id();
// 确保只有持有锁的线程可以解锁
if (owner.load(std::memory_order_acquire) == this_id) {
owner.store(std::thread::id(), std::memory_order_release); // 清除锁的拥有者
mtx.unlock();
} else {
throw std::runtime_error("Thread attempting to unlock a mutex it doesn't own");
}
}
private:
std::mutex mtx; // 实际的互斥量
std::atomic<std::thread::id> owner; // 原子变量,记录当前锁的拥有者
};
#endif
#ifndef PERIODIC_TASK_HPP
#define PERIODIC_TASK_HPP
#include <atomic>
#include <chrono>
#include <condition_variable>
#include <cstdio>
#include <functional>
#include <future>
#include <iostream>
#include <mutex>
#include <stop_token>
#include <thread>
#include <utility>
#include <vector>
namespace periodic {
class PeriodicTask {
public:
explicit PeriodicTask(std::function<void()> func,
std::chrono::milliseconds interval_ms = std::chrono::milliseconds(100))
: func_(std::move(func)), interval_(interval_ms), worker_([this](std::stop_token stoken) { this->run(stoken); }) {
// std::cout << "PeriodicTask created with interval: " << interval_.count() << " ms" << std::endl;
}
~PeriodicTask() {
worker_.request_stop();
cv_.notify_one(); // Ensure worker wakes up when destroyed
// std::cout << "PeriodicTask destructor called, stopping worker." << std::endl;
}
void wakeUp() {
{
std::lock_guard<std::mutex> lock(wakeup_mutex_);
wake_up_requested_ = true;
}
cv_.notify_one(); // Notify worker thread to wake up immediately
// std::cout << "wakeUp() called: worker thread will wake up." << std::endl;
}
std::future<void> wakeUpWait() {
std::promise<void> promise;
std::future<void> future = promise.get_future();
{
std::lock_guard<std::mutex> lock(promise_mutex_);
wakeup_promises_.push_back(std::move(promise));
}
wakeUp();
return future;
}
private:
void run(std::stop_token stoken) {
while (!stoken.stop_requested()) {
std::unique_lock lock(mutex_);
// Wait for either the time interval or a wake-up signal
cv_.wait_for(lock, interval_, [this] { return wake_up_requested_.load(); });
if (stoken.stop_requested())
break;
// If the wake-up was triggered, reset the flag and process the task
{
std::lock_guard<std::mutex> lock(wakeup_mutex_);
wake_up_requested_ = false;
}
try {
// std::cout << "Running task function." << std::endl;
func_();
} catch (...) {
std::cerr << "Error in task function." << std::endl;
}
notifyPromises();
}
}
void notifyPromises() {
std::lock_guard<std::mutex> lock(promise_mutex_);
// std::cout << "Notifying all waiting promises." << std::endl;
for (auto& promise : wakeup_promises_) {
promise.set_value();
}
wakeup_promises_.clear();
}
std::function<void()> func_;
std::chrono::milliseconds interval_;
std::mutex mutex_;
std::condition_variable cv_;
std::vector<std::promise<void>> wakeup_promises_;
std::mutex promise_mutex_;
std::mutex wakeup_mutex_;
std::atomic<bool> wake_up_requested_ = false;
std::jthread worker_;
};
} // namespace periodic
#endif // PERIODIC_TASK_HPP
/*
* @Author: Xie Weiyu ervinxie@qq.com
* @Date: 2024-11-21 06:35:47
* @LastEditors: Xie Weiyu ervinxie@qq.com
* @LastEditTime: 2024-11-21 06:35:50
* @FilePath: /kvc2/src/utils/spin_lock.hpp
* @Description: 这是默认设置,请设置`customMade`, 打开koroFileHeader查看配置 进行设置:
* https://github.com/OBKoro1/koro1FileHeader/wiki/%E9%85%8D%E7%BD%AE
*/
#include <atomic>
#include <chrono>
#include <thread>
class SpinLock {
public:
SpinLock() { flag.clear(); }
void lock() {
const int max_delay = 1024; // Maximum delay in microseconds
int delay = 1; // Initial delay in microseconds
while (flag.test_and_set(std::memory_order_acquire)) {
std::this_thread::sleep_for(std::chrono::microseconds(delay));
delay *= 2;
if (delay > max_delay) {
delay = max_delay;
}
}
}
void unlock() { flag.clear(std::memory_order_release); }
private:
std::atomic_flag flag = ATOMIC_FLAG_INIT;
};
This diff is collapsed.
set(CMAKE_CXX_FLAGS "-Og -march=native -Wall -Wextra -g -fopenmp")
# set(CMAKE_CXX_FLAGS "-O3 -march=native -Wall -Wextra -pthread")
add_subdirectory(kvc2test)
include_directories(${CMAKE_CURRENT_SOURCE_DIR}/../src)
add_executable(hashmap_test hashmap_test.cpp)
target_link_libraries(hashmap_test PRIVATE TBB::tbb)
add_executable(xxHash_test xxHash_test.cpp)
target_link_libraries(xxHash_test PRIVATE xxhash)
function(add_async_store_executable source_file)
get_filename_component(target_name ${source_file} NAME_WE) # 获取不带扩展名的文件名作为目标名
add_executable(${target_name} ${source_file})
target_include_directories(${target_name} PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/../src)
target_include_directories(${target_name} PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/../third_party/nlohmann/single_include)
target_include_directories(${target_name} PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/../third_party/spdlog/include)
target_link_libraries(${target_name} PRIVATE async_store gflags)
endfunction()
add_async_store_executable(async_store_test.cpp)
function(add_kvc2_executable source_file)
get_filename_component(target_name ${source_file} NAME_WE) # 获取不带扩展名的文件名作为目标名
add_executable(${target_name} ${source_file})
# target_compile_options(${target_name} PRIVATE -fopenmp -fno-strict-aliasing)
target_include_directories(${target_name} PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/../src)
target_include_directories(${target_name} PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/../third_party/nlohmann/single_include)
target_include_directories(${target_name} PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/../third_party/spdlog/include)
target_link_libraries(${target_name} PRIVATE kvc2 async_store gflags)
endfunction()
add_kvc2_executable(test_lock_free_queue.cpp)
add_kvc2_executable(test_queue_perf.cpp)
# Disable deprecated test
# add_kvc2_executable(prefix_test.cpp)
# add_kvc2_executable(kvcache_disk_insert_read_test.cpp)
# add_kvc2_executable(kvcache_mem_eviction_test.cpp)
# add_kvc2_executable(kvcache_mem_insert_read_test.cpp)
# add_kvc2_executable(kvcache_save_load_test.cpp)
# add_kvc2_executable(kvc2_export_header_test.cpp)
# add_kvc2_executable(kvc2_export_load_test.cpp)
target_include_directories(async_store_test PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/..//third_party/nlohmann/single_include)
target_include_directories(async_store_test PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/..//third_party/spdlog/include)
target_link_libraries(async_store_test PRIVATE xxhash)
add_executable(test_std_list test_std_list.cpp)
add_executable(test_cuda_stream test_cuda_stream.cpp)
target_include_directories(test_cuda_stream PRIVATE ${CUDAToolkit_INCLUDE_DIRS})
target_link_libraries(test_cuda_stream PRIVATE CUDA::cudart)
add_executable(test_cuda_stream_manager test_cuda_stream_manager.cpp)
target_include_directories(test_cuda_stream_manager PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/../src)
target_link_libraries(test_cuda_stream_manager PRIVATE cuda_stream_manager)
add_executable(test_periodic_task test_periodic_task.cpp)
target_include_directories(test_periodic_task PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/../src)
add_executable(test_page_pool page_pool_test.cpp)
target_include_directories(test_page_pool PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/../src)
target_include_directories(test_page_pool PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/../third_party/spdlog/include)
\ No newline at end of file
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment