diff --git a/src/client.cc b/src/client.cc index 01e76de99..248a1cd2f 100644 --- a/src/client.cc +++ b/src/client.cc @@ -451,6 +451,7 @@ void PClient::OnConnect() { std::string PClient::PeerIP() const { if (!addr_.IsValid()) { + ERROR("Invalid address detected for client {}", uniqueID()); return ""; } return addr_.GetIP(); @@ -458,6 +459,7 @@ std::string PClient::PeerIP() const { int PClient::PeerPort() const { if (!addr_.IsValid()) { + ERROR("Invalid address detected for client {}", uniqueID()); return 0; } return addr_.GetPort(); diff --git a/src/common.h b/src/common.h index d90aaa969..32a956d57 100644 --- a/src/common.h +++ b/src/common.h @@ -60,8 +60,6 @@ extern struct PErrorInfo { int StrToLongDouble(const char* s, size_t slen, long double* ldval); -// class UnboundedBuffer; - std::size_t FormatInt(long value, UnboundedBuffer* reply); std::size_t FormatBulk(const char* str, std::size_t len, UnboundedBuffer* reply); std::size_t FormatBulk(const PString& str, UnboundedBuffer* reply); diff --git a/src/io_thread_pool.cc b/src/io_thread_pool.cc deleted file mode 100644 index 6ad654ca1..000000000 --- a/src/io_thread_pool.cc +++ /dev/null @@ -1,241 +0,0 @@ -/* - * Copyright (c) 2023-present, Qihoo, Inc. All rights reserved. - * This source code is licensed under the BSD-style license found in the - * LICENSE file in the root directory of this source tree. An additional grant - * of patent rights can be found in the PATENTS file in the same directory. - */ - -// #include -// -// #include "io_thread_pool.h" -// #include "pstd/log.h" -// #include "util.h" -// -// namespace pikiwidb { -// -// const size_t IOThreadPool::kMaxWorkers = 128; -// -// bool IOThreadPool::SetWorkerNum(size_t num) { -// if (num <= 1) { -// return true; -// } -// -// if (state_ != State::kNone) { -// ERROR("can only called before application run"); -// return false; -// } -// -// if (!worker_loops_.empty()) { -// ERROR("can only called once, not empty loops size: {}", worker_loops_.size()); -// return false; -// } -// -// worker_num_.store(num); -// worker_threads_.reserve(num); -// worker_loops_.reserve(num); -// -// return true; -// } -// -// bool IOThreadPool::Init(const char* ip, int port, const NewTcpConnectionCallback& cb) { -// auto f = [this] { return ChooseNextWorkerEventLoop(); }; -// -// base_.Init(); -// INFO("base loop {} {}, g_baseLoop {}", base_.GetName(), static_cast(&base_), -// static_cast(pikiwidb::EventLoop::Self())); -// if (!base_.Listen(ip, port, cb, f)) { -// ERROR("can not bind socket on addr {}:{}", ip, port); -// return false; -// } -// -// return true; -// } -// -// void IOThreadPool::Run(int ac, char* av[]) { -// assert(state_ == State::kNone); -// INFO("Process {} starting...", name_); -// -// // start loops in thread pool -// StartWorkers(); -// base_.Run(); -// -// for (auto& w : worker_threads_) { -// if (w.joinable()) { -// w.join(); -// } -// } -// -// worker_threads_.clear(); -// -// INFO("Process {} stopped, goodbye...", name_); -// } -// -// void IOThreadPool::Exit() { -// state_ = State::kStopped; -// -// BaseLoop()->Stop(); -// for (const auto& worker_loop : worker_loops_) { -// EventLoop* loop = worker_loop.get(); -// loop->Stop(); -// } -// } -// -// bool IOThreadPool::IsExit() const { return state_ == State::kStopped; } -// -// EventLoop* IOThreadPool::BaseLoop() { return &base_; } -// -// EventLoop* IOThreadPool::ChooseNextWorkerEventLoop() { -// if (worker_loops_.empty()) { -// return BaseLoop(); -// } -// -// auto& loop = worker_loops_[current_worker_loop_++ % worker_loops_.size()]; -// return loop.get(); -// } -// -// void IOThreadPool::StartWorkers() { -// // only called by main thread -// assert(state_ == State::kNone); -// -// size_t index = 1; -// while (worker_loops_.size() < worker_num_) { -// std::unique_ptr loop = std::make_unique(); -// if (!name_.empty()) { -// loop->SetName(name_ + "_" + std::to_string(index++)); -// INFO("loop {}, name {}", static_cast(loop.get()), loop->GetName().c_str()); -// } -// worker_loops_.push_back(std::move(loop)); -// } -// -// for (index = 0; index < worker_loops_.size(); ++index) { -// EventLoop* loop = worker_loops_[index].get(); -// std::thread t([loop]() { -// loop->Init(); -// loop->Run(); -// }); -// INFO("thread {}, thread loop {}, loop name {}", index, static_cast(loop), loop->GetName().c_str()); -// worker_threads_.push_back(std::move(t)); -// } -// -// state_ = State::kStarted; -// } -// -// void IOThreadPool::SetName(const std::string& name) { name_ = name; } -// -// bool IOThreadPool::Listen(const char* ip, int port, const NewTcpConnectionCallback& ccb) { -// auto f = [this] { return ChooseNextWorkerEventLoop(); }; -// auto loop = BaseLoop(); -// return loop->Execute([loop, ip, port, ccb, f]() { return loop->Listen(ip, port, ccb, f); }).get(); -// } -// -// void IOThreadPool::Connect(const char* ip, int port, const NewTcpConnectionCallback& ccb, -// const TcpConnectionFailCallback& fcb, EventLoop* loop) { -// if (!loop) { -// loop = ChooseNextWorkerEventLoop(); -// } -// -// std::string ipStr(ip); -// loop->Execute([loop, ipStr, port, ccb, fcb]() { loop->Connect(ipStr.c_str(), port, ccb, fcb); }); -// } -// -// std::shared_ptr IOThreadPool::ListenHTTP(const char* ip, int port, HttpServer::OnNewClient cb) { -// auto server = std::make_shared(); -// server->SetOnNewHttpContext(std::move(cb)); -// -// // capture server to make it long live with TcpListener -// auto ncb = [server](TcpConnection* conn) { server->OnNewConnection(conn); }; -// Listen(ip, port, ncb); -// -// return server; -// } -// -// std::shared_ptr IOThreadPool::ConnectHTTP(const char* ip, int port, EventLoop* loop) { -// auto client = std::make_shared(); -// -// // capture client to make it long live with TcpConnection -// auto ncb = [client](TcpConnection* conn) { client->OnConnect(conn); }; -// auto fcb = [client](EventLoop*, const char* ip, int port) { client->OnConnectFail(ip, port); }; -// -// if (!loop) { -// loop = ChooseNextWorkerEventLoop(); -// } -// client->SetLoop(loop); -// Connect(ip, port, std::move(ncb), std::move(fcb), loop); -// -// return client; -// } -// -// void IOThreadPool::Reset() { -// state_ = State::kNone; -// BaseLoop()->Reset(); -// } -// -// void WorkIOThreadPool::PushWriteTask(std::shared_ptr client) { -// auto pos = ++counter_ % worker_num_; -// std::unique_lock lock(*writeMutex_[pos]); -// -// writeQueue_[pos].emplace_back(client); -// writeCond_[pos]->notify_one(); -// } -// -// void WorkIOThreadPool::StartWorkers() { -// // only called by main thread -// assert(state_ == State::kNone); -// -// IOThreadPool::StartWorkers(); -// -// writeMutex_.reserve(worker_num_); -// writeCond_.reserve(worker_num_); -// writeQueue_.reserve(worker_num_); -// for (size_t index = 0; index < worker_num_; ++index) { -// writeMutex_.emplace_back(std::make_unique()); -// writeCond_.emplace_back(std::make_unique()); -// writeQueue_.emplace_back(); -// -// std::thread t([this, index]() { -// while (writeRunning_) { -// std::unique_lock lock(*writeMutex_[index]); -// while (writeQueue_[index].empty()) { -// if (!writeRunning_) { -// break; -// } -// writeCond_[index]->wait(lock); -// } -// if (!writeRunning_) { -// break; -// } -// auto client = writeQueue_[index].front(); -// if (client->State() == ClientState::kOK) { -// client->WriteReply2Client(); -// } -// writeQueue_[index].pop_front(); -// } -// INFO("worker write thread {}, goodbye...", index); -// }); -// -// INFO("worker write thread {}, starting...", index); -// writeThreads_.push_back(std::move(t)); -// } -// } -// -// void WorkIOThreadPool::Exit() { -// IOThreadPool::Exit(); -// -// writeRunning_ = false; -// int i = 0; -// for (auto& cond : writeCond_) { -// std::unique_lock lock(*writeMutex_[i++]); -// cond->notify_all(); -// } -// for (auto& wt : writeThreads_) { -// if (wt.joinable()) { -// wt.join(); -// } -// } -// writeThreads_.clear(); -// writeCond_.clear(); -// writeQueue_.clear(); -// writeMutex_.clear(); -// } -// -// } // namespace pikiwidb diff --git a/src/io_thread_pool.h b/src/io_thread_pool.h deleted file mode 100644 index 1ac9f4830..000000000 --- a/src/io_thread_pool.h +++ /dev/null @@ -1,110 +0,0 @@ -/* - * Copyright (c) 2023-present, Qihoo, Inc. All rights reserved. - * This source code is licensed under the BSD-style license found in the - * LICENSE file in the root directory of this source tree. An additional grant - * of patent rights can be found in the PATENTS file in the same directory. - */ - -#pragma once - -// #include -// #include -// #include -// #include -// #include -// -// #include "client.h" -// #include "cmd_thread_pool.h" -// #include "net/event_loop.h" -// #include "net/http_client.h" -// #include "net/http_server.h" -// -// namespace pikiwidb { -// -// class IOThreadPool { -// public: -// IOThreadPool() = default; -// ~IOThreadPool() = default; -// -// static size_t GetMaxWorkerNum() { return kMaxWorkers; } -// -// bool Init(const char* ip, int port, const NewTcpConnectionCallback& ccb); -// void Run(int argc, char* argv[]); -// virtual void Exit(); -// bool IsExit() const; -// EventLoop* BaseLoop(); -// -// // choose a loop -// EventLoop* ChooseNextWorkerEventLoop(); -// -// // set worker threads, each thread has a EventLoop object -// bool SetWorkerNum(size_t n); -// -// // app name, for top command -// void SetName(const std::string& name); -// -// // TCP server -// bool Listen(const char* ip, int port, const NewTcpConnectionCallback& ccb); -// -// // TCP client -// void Connect(const char* ip, int port, const NewTcpConnectionCallback& ccb, -// const TcpConnectionFailCallback& fcb = TcpConnectionFailCallback(), EventLoop* loop = nullptr); -// -// // HTTP server -// std::shared_ptr ListenHTTP(const char* ip, int port, -// HttpServer::OnNewClient cb = HttpServer::OnNewClient()); -// -// // HTTP client -// std::shared_ptr ConnectHTTP(const char* ip, int port, EventLoop* loop = nullptr); -// -// virtual void PushWriteTask(std::shared_ptr /*unused*/){}; -// -// // for unittest only -// void Reset(); -// -// protected: -// virtual void StartWorkers(); -// -// static const size_t kMaxWorkers; -// -// std::string name_; -// std::string listen_ip_; -// int listen_port_{0}; -// NewTcpConnectionCallback new_conn_cb_; -// -// EventLoop base_; -// -// std::atomic worker_num_{0}; -// std::vector worker_threads_; -// std::vector> worker_loops_; -// mutable std::atomic current_worker_loop_{0}; -// -// enum class State { -// kNone, -// kStarted, -// kStopped, -// }; -// std::atomic state_{State::kNone}; -// }; -// -// class WorkIOThreadPool : public IOThreadPool { -// public: -// WorkIOThreadPool() = default; -// ~WorkIOThreadPool() = default; -// -// void Exit() override; -// void PushWriteTask(std::shared_ptr client) override; -// -// private: -// void StartWorkers() override; -// -// private: -// std::vector writeThreads_; -// std::vector> writeMutex_; -// std::vector> writeCond_; -// std::vector>> writeQueue_; -// std::atomic counter_ = 0; -// bool writeRunning_ = true; -// }; -// -// } // namespace pikiwidb diff --git a/src/pikiwidb.h b/src/pikiwidb.h index 367838959..3f354ee01 100644 --- a/src/pikiwidb.h +++ b/src/pikiwidb.h @@ -8,7 +8,6 @@ #include "cmd_table_manager.h" #include "cmd_thread_pool.h" #include "common.h" -#include "io_thread_pool.h" #include "net/event_server.h" #define KPIKIWIDB_VERSION "4.0.0" diff --git a/src/pubsub.cc b/src/pubsub.cc index 88ad25694..ad52e9c46 100644 --- a/src/pubsub.cc +++ b/src/pubsub.cc @@ -8,7 +8,6 @@ #include #include "client.h" -// #include "event_loop.h" #include "log.h" #include "pubsub.h" diff --git a/src/replication.cc b/src/replication.cc index 1971e09db..35f7edcbe 100644 --- a/src/replication.cc +++ b/src/replication.cc @@ -10,9 +10,7 @@ #include "client.h" #include "config.h" -// #include "event_loop.h" #include "log.h" -// #include "net/util.h" #include "pikiwidb.h" #include "pstd/pstd_string.h" #include "replication.h"