Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add worker thread queue #2945

Draft
wants to merge 4 commits into
base: unstable
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -367,9 +367,9 @@ set(ZSTD_INCLUDE_DIR ${INSTALL_INCLUDEDIR})
ExternalProject_Add(fmt
DEPENDS
URL
https://github.com/fmtlib/fmt/archive/refs/tags/7.1.0.tar.gz
https://github.com/fmtlib/fmt/archive/refs/tags/10.2.1.tar.gz
URL_HASH
MD5=32af902636d373641f4ef9032fc65b3a
MD5=dc09168c94f90ea890257995f2c497a5
DOWNLOAD_NO_PROGRESS
1
UPDATE_COMMAND
Expand Down
42 changes: 24 additions & 18 deletions src/net/include/thread_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,51 +37,57 @@ class ThreadPool : public pstd::noncopyable {
public:
class Worker {
public:
explicit Worker(ThreadPool* tp) : start_(false), thread_pool_(tp){};
explicit Worker(size_t size) : start_(false), should_stop_(false), max_queue_size_(size) {};
static void* WorkerMain(void* arg);

int start();
int stop();
void Schedule(TaskFunc func, void* arg);
size_t cur_queue_size();
size_t cur_time_queue_size();


private:
void runInThread();
size_t max_queue_size();
void DelaySchedule(uint64_t timeout, TaskFunc func, void* arg);
bool should_stop();
void set_should_stop();

pthread_t thread_id_;
std::atomic<bool> start_;
ThreadPool* const thread_pool_;
std::string worker_name_;
std::queue<Task> queue_;
std::priority_queue<TimeTask> time_queue_;
pstd::Mutex mu_;
pstd::CondVar rsignal_;
pstd::CondVar wsignal_;
std::atomic<bool> should_stop_;
size_t max_queue_size_;
};

explicit ThreadPool(size_t worker_num, size_t max_queue_size, std::string thread_pool_name = "ThreadPool");
virtual ~ThreadPool();

void Schedule(TaskFunc func, void* arg);
int start_thread_pool();
int stop_thread_pool();
bool should_stop();
void set_should_stop();

void Schedule(TaskFunc func, void* arg);
void DelaySchedule(uint64_t timeout, TaskFunc func, void* arg);
size_t max_queue_size();
size_t worker_size();
std::string thread_pool_name();
void cur_queue_size(size_t* qsize);
void cur_time_queue_size(size_t* qsize);
std::string thread_pool_name();

private:
void runInThread();
/*
* Here we used auto poll to find the next work thread,
* last_thread_ is the last work thread
*/

size_t worker_num_;
size_t max_queue_size_;
std::string thread_pool_name_;
std::queue<Task> queue_;
std::priority_queue<TimeTask> time_queue_;
std::vector<Worker*> workers_;
std::atomic<bool> running_;
std::atomic<bool> should_stop_;

pstd::Mutex mu_;
pstd::CondVar rsignal_;
pstd::CondVar wsignal_;

};

} // namespace net
Expand Down
79 changes: 53 additions & 26 deletions src/net/src/thread_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,26 +13,31 @@

namespace net {

thread_local int last_thread_ = 0;

void* ThreadPool::Worker::WorkerMain(void* arg) {
auto tp = static_cast<ThreadPool*>(arg);
tp->runInThread();
Worker* worker = static_cast<Worker*>(arg);
worker->runInThread();
return nullptr;
}

int ThreadPool::Worker::start() {
if (!start_.load()) {
if (pthread_create(&thread_id_, nullptr, &WorkerMain, thread_pool_) != 0) {
if (pthread_create(&thread_id_, nullptr, &WorkerMain, this) != 0) {
return -1;
} else {
start_.store(true);
std::string thread_id_str = std::to_string(reinterpret_cast<unsigned long>(thread_id_));
SetThreadName(thread_id_, thread_pool_->thread_pool_name() + "_Worker_" + thread_id_str);
SetThreadName(thread_id_, "Worker_" + thread_id_str);
}
}
return 0;
}

int ThreadPool::Worker::stop() {
should_stop_.store(true);
rsignal_.notify_one();
wsignal_.notify_one();
if (start_.load()) {
if (pthread_join(thread_id_, nullptr) != 0) {
return -1;
Expand All @@ -43,20 +48,18 @@ int ThreadPool::Worker::stop() {
return 0;
}

ThreadPool::ThreadPool(size_t worker_num, size_t max_queue_size, std::string thread_pool_name)
ThreadPool::ThreadPool(size_t worker_num, size_t max_queue_size, std::string thread_pool_name)
: worker_num_(worker_num),
max_queue_size_(max_queue_size),
thread_pool_name_(std::move(thread_pool_name)),
running_(false),
should_stop_(false) {}
running_(false) {}

ThreadPool::~ThreadPool() { stop_thread_pool(); }

int ThreadPool::start_thread_pool() {
if (!running_.load()) {
should_stop_.store(false);
for (size_t i = 0; i < worker_num_; ++i) {
workers_.push_back(new Worker(this));
workers_.push_back(new Worker(max_queue_size_));
int res = workers_[i]->start();
if (res != 0) {
return kCreateThreadError;
Expand All @@ -70,9 +73,6 @@ int ThreadPool::start_thread_pool() {
int ThreadPool::stop_thread_pool() {
int res = 0;
if (running_.load()) {
should_stop_.store(true);
rsignal_.notify_all();
wsignal_.notify_all();
for (const auto worker : workers_) {
res = worker->stop();
if (res != 0) {
Expand All @@ -87,50 +87,77 @@ int ThreadPool::stop_thread_pool() {
return res;
}

bool ThreadPool::should_stop() { return should_stop_.load(); }
bool ThreadPool::Worker::should_stop() { return should_stop_.load(); }

void ThreadPool::set_should_stop() { should_stop_.store(true); }
void ThreadPool::Worker::set_should_stop() { should_stop_.store(true); }

void ThreadPool::Schedule(TaskFunc func, void* arg) {
std::unique_lock lock(mu_);
void ThreadPool::Worker::Schedule(TaskFunc func, void* arg) {
std::unique_lock lock(mu_);
wsignal_.wait(lock, [this]() { return queue_.size() < max_queue_size_ || should_stop(); });

if (!should_stop()) {
if (queue_.size() < max_queue_size_ && !should_stop()) {
queue_.emplace(func, arg);
rsignal_.notify_one();
}
}

void ThreadPool::Schedule(TaskFunc func, void* arg) {
int next_thread = last_thread_;
for (int cnt = 0; cnt < worker_num_; cnt++) {
Worker* worker = workers_[next_thread];
worker->Schedule(func, arg);
last_thread_ = (next_thread + 1) % worker_num_;
break;
}
}

/*
* timeout is in millisecond
*/
void ThreadPool::DelaySchedule(uint64_t timeout, TaskFunc func, void* arg) {
void ThreadPool::Worker::DelaySchedule(uint64_t timeout, TaskFunc func, void* arg) {
auto now = std::chrono::system_clock::now();
uint64_t unow = std::chrono::duration_cast<std::chrono::microseconds>(now.time_since_epoch()).count();
uint64_t exec_time = unow + timeout * 1000;

std::lock_guard lock(mu_);
std::unique_lock lock(mu_);
if (!should_stop()) {
time_queue_.emplace(exec_time, func, arg);
rsignal_.notify_all();
rsignal_.notify_one();
}
}

size_t ThreadPool::Worker::max_queue_size() { return max_queue_size_; }

size_t ThreadPool::Worker::cur_queue_size() {
std::lock_guard lock(mu_);
return queue_.size();
}

size_t ThreadPool::Worker::cur_time_queue_size() {
std::lock_guard lock(mu_);
return time_queue_.size();
}

size_t ThreadPool::max_queue_size() { return max_queue_size_; }

void ThreadPool::cur_queue_size(size_t* qsize) {
std::lock_guard lock(mu_);
*qsize = queue_.size();
size_t qsize_sum = 0;
for (const auto worker : workers_) {
qsize_sum += worker->cur_queue_size();
}
qsize = &qsize_sum;
}

void ThreadPool::cur_time_queue_size(size_t* qsize) {
std::lock_guard lock(mu_);
*qsize = time_queue_.size();
size_t qsize_sum = 0;
for (const auto worker : workers_) {
qsize_sum += worker->cur_time_queue_size();
}
qsize = &qsize_sum;
}

std::string ThreadPool::thread_pool_name() { return thread_pool_name_; }

void ThreadPool::runInThread() {
void ThreadPool::Worker::runInThread() {
while (!should_stop()) {
std::unique_lock lock(mu_);
rsignal_.wait(lock, [this]() { return !queue_.empty() || !time_queue_.empty() || should_stop(); });
Expand Down
Loading