diff --git a/CMakeLists.txt b/CMakeLists.txt index 342956b682..81bf712133 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -367,9 +367,9 @@ set(ZSTD_INCLUDE_DIR ${INSTALL_INCLUDEDIR}) ExternalProject_Add(fmt DEPENDS URL - https://github.com/fmtlib/fmt/archive/refs/tags/7.1.0.tar.gz + https://github.com/fmtlib/fmt/archive/refs/tags/10.2.1.tar.gz URL_HASH - MD5=32af902636d373641f4ef9032fc65b3a + MD5=dc09168c94f90ea890257995f2c497a5 DOWNLOAD_NO_PROGRESS 1 UPDATE_COMMAND diff --git a/src/net/include/thread_pool.h b/src/net/include/thread_pool.h index 0ec3d1bcb1..f0e519b4db 100644 --- a/src/net/include/thread_pool.h +++ b/src/net/include/thread_pool.h @@ -37,51 +37,57 @@ class ThreadPool : public pstd::noncopyable { public: class Worker { public: - explicit Worker(ThreadPool* tp) : start_(false), thread_pool_(tp){}; + explicit Worker(size_t size) : start_(false), should_stop_(false), max_queue_size_(size) {}; static void* WorkerMain(void* arg); int start(); int stop(); + void Schedule(TaskFunc func, void* arg); + size_t cur_queue_size(); + size_t cur_time_queue_size(); + private: + void runInThread(); + size_t max_queue_size(); + void DelaySchedule(uint64_t timeout, TaskFunc func, void* arg); + bool should_stop(); + void set_should_stop(); + pthread_t thread_id_; std::atomic start_; - ThreadPool* const thread_pool_; std::string worker_name_; + std::queue queue_; + std::priority_queue time_queue_; + pstd::Mutex mu_; + pstd::CondVar rsignal_; + pstd::CondVar wsignal_; + std::atomic should_stop_; + size_t max_queue_size_; }; explicit ThreadPool(size_t worker_num, size_t max_queue_size, std::string thread_pool_name = "ThreadPool"); virtual ~ThreadPool(); - + void Schedule(TaskFunc func, void* arg); int start_thread_pool(); int stop_thread_pool(); - bool should_stop(); - void set_should_stop(); - - void Schedule(TaskFunc func, void* arg); - void DelaySchedule(uint64_t timeout, TaskFunc func, void* arg); size_t max_queue_size(); size_t worker_size(); + std::string thread_pool_name(); void cur_queue_size(size_t* qsize); void cur_time_queue_size(size_t* qsize); - std::string thread_pool_name(); private: - void runInThread(); + /* + * Here we used auto poll to find the next work thread, + * last_thread_ is the last work thread + */ size_t worker_num_; size_t max_queue_size_; std::string thread_pool_name_; - std::queue queue_; - std::priority_queue time_queue_; std::vector workers_; std::atomic running_; - std::atomic should_stop_; - - pstd::Mutex mu_; - pstd::CondVar rsignal_; - pstd::CondVar wsignal_; - }; } // namespace net diff --git a/src/net/src/thread_pool.cc b/src/net/src/thread_pool.cc index 8e20694244..bf2f4912c7 100644 --- a/src/net/src/thread_pool.cc +++ b/src/net/src/thread_pool.cc @@ -13,26 +13,31 @@ namespace net { +thread_local int last_thread_ = 0; + void* ThreadPool::Worker::WorkerMain(void* arg) { - auto tp = static_cast(arg); - tp->runInThread(); + Worker* worker = static_cast(arg); + worker->runInThread(); return nullptr; } int ThreadPool::Worker::start() { if (!start_.load()) { - if (pthread_create(&thread_id_, nullptr, &WorkerMain, thread_pool_) != 0) { + if (pthread_create(&thread_id_, nullptr, &WorkerMain, this) != 0) { return -1; } else { start_.store(true); std::string thread_id_str = std::to_string(reinterpret_cast(thread_id_)); - SetThreadName(thread_id_, thread_pool_->thread_pool_name() + "_Worker_" + thread_id_str); + SetThreadName(thread_id_, "Worker_" + thread_id_str); } } return 0; } int ThreadPool::Worker::stop() { + should_stop_.store(true); + rsignal_.notify_one(); + wsignal_.notify_one(); if (start_.load()) { if (pthread_join(thread_id_, nullptr) != 0) { return -1; @@ -43,20 +48,18 @@ int ThreadPool::Worker::stop() { return 0; } -ThreadPool::ThreadPool(size_t worker_num, size_t max_queue_size, std::string thread_pool_name) +ThreadPool::ThreadPool(size_t worker_num, size_t max_queue_size, std::string thread_pool_name) : worker_num_(worker_num), max_queue_size_(max_queue_size), thread_pool_name_(std::move(thread_pool_name)), - running_(false), - should_stop_(false) {} + running_(false) {} ThreadPool::~ThreadPool() { stop_thread_pool(); } int ThreadPool::start_thread_pool() { if (!running_.load()) { - should_stop_.store(false); for (size_t i = 0; i < worker_num_; ++i) { - workers_.push_back(new Worker(this)); + workers_.push_back(new Worker(max_queue_size_)); int res = workers_[i]->start(); if (res != 0) { return kCreateThreadError; @@ -70,9 +73,6 @@ int ThreadPool::start_thread_pool() { int ThreadPool::stop_thread_pool() { int res = 0; if (running_.load()) { - should_stop_.store(true); - rsignal_.notify_all(); - wsignal_.notify_all(); for (const auto worker : workers_) { res = worker->stop(); if (res != 0) { @@ -87,50 +87,77 @@ int ThreadPool::stop_thread_pool() { return res; } -bool ThreadPool::should_stop() { return should_stop_.load(); } +bool ThreadPool::Worker::should_stop() { return should_stop_.load(); } -void ThreadPool::set_should_stop() { should_stop_.store(true); } +void ThreadPool::Worker::set_should_stop() { should_stop_.store(true); } -void ThreadPool::Schedule(TaskFunc func, void* arg) { - std::unique_lock lock(mu_); +void ThreadPool::Worker::Schedule(TaskFunc func, void* arg) { + std::unique_lock lock(mu_); wsignal_.wait(lock, [this]() { return queue_.size() < max_queue_size_ || should_stop(); }); - - if (!should_stop()) { + if (queue_.size() < max_queue_size_ && !should_stop()) { queue_.emplace(func, arg); rsignal_.notify_one(); } } +void ThreadPool::Schedule(TaskFunc func, void* arg) { + int next_thread = last_thread_; + for (int cnt = 0; cnt < worker_num_; cnt++) { + Worker* worker = workers_[next_thread]; + worker->Schedule(func, arg); + last_thread_ = (next_thread + 1) % worker_num_; + break; + } +} + /* * timeout is in millisecond */ -void ThreadPool::DelaySchedule(uint64_t timeout, TaskFunc func, void* arg) { +void ThreadPool::Worker::DelaySchedule(uint64_t timeout, TaskFunc func, void* arg) { auto now = std::chrono::system_clock::now(); uint64_t unow = std::chrono::duration_cast(now.time_since_epoch()).count(); uint64_t exec_time = unow + timeout * 1000; - std::lock_guard lock(mu_); + std::unique_lock lock(mu_); if (!should_stop()) { time_queue_.emplace(exec_time, func, arg); - rsignal_.notify_all(); + rsignal_.notify_one(); } } +size_t ThreadPool::Worker::max_queue_size() { return max_queue_size_; } + +size_t ThreadPool::Worker::cur_queue_size() { + std::lock_guard lock(mu_); + return queue_.size(); +} + +size_t ThreadPool::Worker::cur_time_queue_size() { + std::lock_guard lock(mu_); + return time_queue_.size(); +} + size_t ThreadPool::max_queue_size() { return max_queue_size_; } void ThreadPool::cur_queue_size(size_t* qsize) { - std::lock_guard lock(mu_); - *qsize = queue_.size(); + size_t qsize_sum = 0; + for (const auto worker : workers_) { + qsize_sum += worker->cur_queue_size(); + } + qsize = &qsize_sum; } void ThreadPool::cur_time_queue_size(size_t* qsize) { - std::lock_guard lock(mu_); - *qsize = time_queue_.size(); + size_t qsize_sum = 0; + for (const auto worker : workers_) { + qsize_sum += worker->cur_time_queue_size(); + } + qsize = &qsize_sum; } std::string ThreadPool::thread_pool_name() { return thread_pool_name_; } -void ThreadPool::runInThread() { +void ThreadPool::Worker::runInThread() { while (!should_stop()) { std::unique_lock lock(mu_); rsignal_.wait(lock, [this]() { return !queue_.empty() || !time_queue_.empty() || should_stop(); });