From 35aaf7339d030a457e7a6795e7e567ddaa76b8b5 Mon Sep 17 00:00:00 2001 From: ortur Date: Mon, 29 Apr 2024 19:14:19 +0300 Subject: [PATCH] Feat: Implemented Least-connection algorithm --- configs/config.yaml | 8 +- src/lb/tcp/connector.cpp | 27 +++++-- src/lb/tcp/connector.hpp | 1 + src/lb/tcp/selectors.cpp | 103 +++++++++++++++++++++++++- src/lb/tcp/selectors.hpp | 54 +++++++++++++- src/lb/tcp/session.cpp | 147 +++++++++++++++++++++++++++++++++++++ src/lb/tcp/session.hpp | 63 ++++++++++++++-- src/lb/url.cpp | 3 +- tests/learn_boost_heap.cpp | 39 ++++++++++ tests/test_selectors.cpp | 48 ++++++++++++ 10 files changed, 468 insertions(+), 25 deletions(-) create mode 100644 tests/learn_boost_heap.cpp diff --git a/configs/config.yaml b/configs/config.yaml index 8f54400..32cae59 100644 --- a/configs/config.yaml +++ b/configs/config.yaml @@ -13,11 +13,11 @@ acceptor: ip_version: 4 load_balancing: - algorithm: ip_hash + algorithm: least_connections endpoints: - ip: "127.0.0.1" - port: 8001 + port: 8081 - ip: "127.0.0.2" - port: 8002 + port: 8082 - ip: "127.0.0.3" - port: 8003 \ No newline at end of file + port: 8083 \ No newline at end of file diff --git a/src/lb/tcp/connector.cpp b/src/lb/tcp/connector.cpp index 096ea07..357b9fa 100644 --- a/src/lb/tcp/connector.cpp +++ b/src/lb/tcp/connector.cpp @@ -1,7 +1,6 @@ #include #include #include -#include "connector.hpp" namespace lb::tcp { @@ -11,6 +10,16 @@ Connector::Connector(boost::asio::io_context& ctx, SelectorPtr selector) , selector(selector) {} +SessionPtr MakeSession(SelectorPtr& selector, boost::asio::ip::tcp::socket client_socket, boost::asio::ip::tcp::socket server_socket, Backend backend) +{ + if (selector->Type() == SelectorType::LEAST_CONNECTIONS) { + std::shared_ptr lc_selector = std::dynamic_pointer_cast(selector); + return std::make_shared(std::move(client_socket), std::move(server_socket), lc_selector, backend); + } + return std::make_shared(std::move(client_socket), std::move(server_socket)); +} + + void Connector::MakeAndRunSession(boost::asio::ip::tcp::socket client_socket) { // TODO: selection of backend @@ -28,13 +37,13 @@ void Connector::MakeAndRunSession(boost::asio::ip::tcp::socket client_socket) if (error) { ERROR("{}", error.message()); if (error == boost::asio::error::connection_refused) { - selector->ExcludeBackend(backend); + selector->ExcludeBackend(std::move(backend)); MakeAndRunSession(std::move(client_socket)); } return; } - auto connection = std::make_shared(std::move(client_socket), std::move(*server_socket)); - connection->Run(); + SessionPtr session = MakeSession(selector, std::move(client_socket), std::move(*server_socket), std::move(backend)); + session->Run(); }); } else if (backend.IsUrl()) { DEBUG("Is url"); @@ -45,7 +54,8 @@ void Connector::MakeAndRunSession(boost::asio::ip::tcp::socket client_socket) ResolverQuery resolve_query{url.Hostname(), url.Port()}; resolver.async_resolve( resolve_query, - [this, server_socket, client_socket=std::move(client_socket), backend=std::move(backend)](const boost::system::error_code& error, ResolverResults resolver_results) mutable + [this, server_socket, client_socket=std::move(client_socket), backend=std::move(backend)] + (const boost::system::error_code& error, ResolverResults resolver_results) mutable { if (error) { ERROR("Resolve error: {}", error.message()); @@ -59,14 +69,15 @@ void Connector::MakeAndRunSession(boost::asio::ip::tcp::socket client_socket) boost::asio::async_connect( *server_socket, resolver_results, - [this, server_socket, client_socket=std::move(client_socket)] (const boost::system::error_code& error, const boost::asio::ip::tcp::endpoint& endpoint) mutable + [this, server_socket, client_socket=std::move(client_socket), backend=std::move(backend)] + (const boost::system::error_code& error, const boost::asio::ip::tcp::endpoint& endpoint) mutable { if (error) { ERROR("{}", error.message()); return; } - auto connection = std::make_shared(std::move(client_socket), std::move(*server_socket)); - connection->Run(); + SessionPtr session = MakeSession(selector, std::move(client_socket), std::move(*server_socket), std::move(backend)); + session->Run(); }); }); } diff --git a/src/lb/tcp/connector.hpp b/src/lb/tcp/connector.hpp index f95f13e..e8b3b17 100644 --- a/src/lb/tcp/connector.hpp +++ b/src/lb/tcp/connector.hpp @@ -2,6 +2,7 @@ #include #include +#include #include diff --git a/src/lb/tcp/selectors.cpp b/src/lb/tcp/selectors.cpp index 33519ea..71ebdae 100644 --- a/src/lb/tcp/selectors.cpp +++ b/src/lb/tcp/selectors.cpp @@ -87,7 +87,9 @@ SelectorPtr DetectSelector(const YAML::Node& node) {"round_robin", SelectorType::ROUND_ROBIN}, {"weighted_round_robin", SelectorType::WEIGHTED_ROUND_ROBIN}, {"consistent_hash", SelectorType::CONSISTENT_HASH}, - {"ip_hash", SelectorType::IP_HASH} + {"ip_hash", SelectorType::IP_HASH}, + {"least_connections", SelectorType::LEAST_CONNECTIONS}, + }; if (!balancing_node["algorithm"].IsDefined()) { @@ -127,7 +129,12 @@ SelectorPtr DetectSelector(const YAML::Node& node) ch->Configure(balancing_node); return ch; } break; - + case SelectorType::LEAST_CONNECTIONS: { + SelectorPtr lc = std::make_shared(); + lc->Configure(balancing_node); + return lc; + } + break; default: { STACKTRACE("Selector {} is not implemented", name); } @@ -433,4 +440,96 @@ SelectorType ConsistentHashSelector::Type() const return SelectorType::CONSISTENT_HASH; } +// ============================ LeastConnectionsSelector ============================ + +void LeastConnectionsSelector::Configure(const YAML::Node& config) { + if (!config["endpoints"].IsDefined()) { + STACKTRACE("Least connections endpoints node is missed"); + } + const YAML::Node& ep_node = config["endpoints"]; + if (!ep_node.IsSequence()) { + EXCEPTION("endpoints node must be a sequence"); + } + + for (const YAML::Node& ep : ep_node) { + if (ep["url"].IsDefined()) { + InsertBackend(Backend(ep["url"].as())); + continue; + } + + if (!ep["ip"].IsDefined()) { + STACKTRACE("{} missed {} field", ep, "ip"); + } + if (!ep["port"].IsDefined()) { + STACKTRACE("{} missed {} field", ep, "port"); + } + + InsertBackend(Backend(ep["ip"].as(), ep["port"].as())); + } +} + + +void LeastConnectionsSelector::InsertBackend(Backend&& b) +{ + boost::recursive_mutex::scoped_lock lock(mutex_); + HandleType handle = backends_.push(CounterWrapper{.b = std::move(b), + .counter = 0}); + handle_pool_[(*handle).b.ToString()] = std::move(handle); +} + + +Backend LeastConnectionsSelector::SelectBackend(const boost::asio::ip::tcp::endpoint& notused) +{ + boost::recursive_mutex::scoped_lock lock(mutex_); + Backend result = backends_.top().b; + IncreaseConnectionCount(backends_.top().b); + return result; +} + +void LeastConnectionsSelector::ExcludeBackend(const Backend& backend) +{ + boost::recursive_mutex::scoped_lock lock(mutex_); + + const std::string& str = backend.ToString(); + if (auto it = handle_pool_.find(str); it != handle_pool_.end()) { + HandleType& handle = it->second; + + backends_.erase(handle); + handle_pool_.erase(str); + + if (backends_.empty()) { + EXCEPTION("All backends are excluded!"); + } + } +} + +SelectorType LeastConnectionsSelector::Type() const +{ + return SelectorType::LEAST_CONNECTIONS; +} + +void LeastConnectionsSelector::IncreaseConnectionCount(const Backend& backend) +{ + boost::recursive_mutex::scoped_lock lock(mutex_); + const std::string& str = backend.ToString(); + if (auto it = handle_pool_.find(str); it != handle_pool_.end()) { + HandleType& handle = it->second; + (*handle).counter++; + backends_.decrease(handle); + } +} + +void LeastConnectionsSelector::DecreaseConnectionCount(const Backend& backend) +{ + boost::recursive_mutex::scoped_lock lock(mutex_); + const std::string& str = backend.ToString(); + if (auto it = handle_pool_.find(str); it != handle_pool_.end()) { + HandleType& handle = it->second; + if ((*handle).counter > 0) { + (*handle).counter--; + backends_.decrease(handle); + } + } +} + } // namespace lb::tcp diff --git a/src/lb/tcp/selectors.hpp b/src/lb/tcp/selectors.hpp index f7c6aee..0f27a34 100644 --- a/src/lb/tcp/selectors.hpp +++ b/src/lb/tcp/selectors.hpp @@ -3,12 +3,15 @@ #include #include #include +#include + #include #include #include +#include +#include // for pumba::ConsistentHashingRouter +#include // for boost::heap::pairing_heap #include -#include - namespace lb::tcp { @@ -38,6 +41,14 @@ class Backend { std::variant value; }; + +struct BackendHasher { + std::size_t operator()(const lb::tcp::Backend& backend) const { + static std::hash hash{}; + return hash(backend.ToString()); + } +}; + std::ostream& operator<<(std::ostream& out, const Backend& backend); enum class SelectorType { @@ -45,6 +56,7 @@ enum class SelectorType { WEIGHTED_ROUND_ROBIN, IP_HASH, CONSISTENT_HASH, + LEAST_CONNECTIONS, }; struct ISelector { @@ -146,4 +158,40 @@ class ConsistentHashSelector final : public ISelector { }; -} // namespace lb::tcp \ No newline at end of file + +class LeastConnectionsSelector final : public ISelector { +public: + void Configure(const YAML::Node& config) override; + Backend SelectBackend(const boost::asio::ip::tcp::endpoint& client_address) override; + void ExcludeBackend(const Backend& backend) override; + SelectorType Type() const override; + void IncreaseConnectionCount(const Backend& backend); + void DecreaseConnectionCount(const Backend& backend); +private: + + struct CounterWrapper { + lb::tcp::Backend b; + std::size_t counter = 0; + }; + + struct ConnectionsCompare { + bool operator()(const CounterWrapper& lhs, const CounterWrapper& rhs) const + { + return lhs.counter > rhs.counter; + } + }; + + using PairingMap = boost::heap::pairing_heap>; + using HandleType = PairingMap::handle_type; + +private: + + void InsertBackend(Backend&& b); + +private: + boost::recursive_mutex mutex_; + std::unordered_map handle_pool_; + PairingMap backends_; +}; + +} // namespace lb::tcp diff --git a/src/lb/tcp/session.cpp b/src/lb/tcp/session.cpp index e71aa22..635777f 100644 --- a/src/lb/tcp/session.cpp +++ b/src/lb/tcp/session.cpp @@ -8,6 +8,8 @@ namespace lb { namespace tcp { + +// ================================= HttpSession ================================= HttpSession::HttpSession(boost::asio::ip::tcp::socket client, boost::asio::ip::tcp::socket server) : BasicSession() @@ -166,6 +168,151 @@ const HttpSession::IdType& HttpSession::Id() const return id; } + +// ================================= LeastConnectionsHttpSession ================================= +LeastConnectionsHttpSession::LeastConnectionsHttpSession(boost::asio::ip::tcp::socket client, + boost::asio::ip::tcp::socket server, + SelectorType selector, Backend server_backend) + : BasicSession() + , client_socket(std::move(client)) + , server_socket(std::move(server)) + , id(generateId()) + , lc_selector(std::move(selector)) + , server_backend(std::move(server_backend)) +{ + cb.prepare(BUFFER_SIZE); + sb.prepare(BUFFER_SIZE); + DEBUG("LeastConnectionsHttpSession id:{} constructed", id); + // selector->IncreaseConnectionCount(server_backend); +} + +void LeastConnectionsHttpSession::Run() +{ + ClientRead(); + ServerRead(); +} + + +// Client->Server communication callbacks-chain +void LeastConnectionsHttpSession::ClientRead() +{ + cr.clear(); + boost::beast::http::async_read( + client_socket, + cb, + cr, + [self=shared_from_this()] (const boost::system::error_code& ec, std::size_t length) { + self->HandleClientRead(ec, length); + } + ); +} + +void LeastConnectionsHttpSession::HandleClientRead(boost::system::error_code ec, std::size_t length) +{ + if (ec) { + if (NeedErrorLogging(ec)) { + SERROR("sid:{} {}", id, ec.message()); + } + Cancel(); + return; + } + //DEBUG("sid:{} client-msg:{}", id, client_buffer); + SendToServer(); +} + +void LeastConnectionsHttpSession::SendToServer() +{ + boost::beast::http::async_write( + server_socket, + cr, + [self=shared_from_this()](boost::system::error_code ec, std::size_t length) { + self->HandleSendToServer(ec, length); + }); +} + + +void LeastConnectionsHttpSession::HandleSendToServer(boost::system::error_code ec, std::size_t length) +{ + if (ec) { + if (NeedErrorLogging(ec)) { + SERROR("sid:{} {}", id, ec.message()); + } + Cancel(); + return; + } + + ClientRead(); +} + +// Server->Client communication callbacks-chain +void LeastConnectionsHttpSession::ServerRead() +{ + sr.clear(); + boost::beast::http::async_read( + server_socket, + sb, + sr, + [self=shared_from_this()] (const boost::system::error_code& ec, std::size_t length) { + self->HandleServerRead(ec, length); + } + ); +} + +void LeastConnectionsHttpSession::HandleServerRead(boost::system::error_code ec, std::size_t length) +{ + if (ec) { + if (NeedErrorLogging(ec)) { + SERROR("sid:{} {}", id, ec.message()); + } + Cancel(); + return; + } + SendToClient(); +} + +void LeastConnectionsHttpSession::SendToClient() +{ + boost::beast::http::async_write(client_socket, sr, + [self=shared_from_this()](boost::system::error_code ec, std::size_t length){ + self->HandleSendToClient(ec, length); + }); +} + +void LeastConnectionsHttpSession::HandleSendToClient(boost::system::error_code ec, std::size_t length) { + if (ec) { + if (NeedErrorLogging(ec)) { + SERROR("sid:{} {}", id, ec.message()); + } + Cancel(); + } + ServerRead(); +} + +void LeastConnectionsHttpSession::Cancel() +{ + CloseSocket(client_socket); + CloseSocket(server_socket); + lc_selector->DecreaseConnectionCount(server_backend); +} + + +LeastConnectionsHttpSession::IdType LeastConnectionsHttpSession::generateId() +{ + static std::atomic id = 0; + LeastConnectionsHttpSession::IdType result = id.fetch_add(1, std::memory_order_relaxed); + return result; +} + +const LeastConnectionsHttpSession::IdType& LeastConnectionsHttpSession::Id() const +{ + return id; +} + +LeastConnectionsHttpSession::~LeastConnectionsHttpSession() +{ + Cancel(); + DEBUG("LeastConnectionsHttpSession id:{} destructed", id); +} } // namespace tcp } // namespace lb \ No newline at end of file diff --git a/src/lb/tcp/session.hpp b/src/lb/tcp/session.hpp index f7d8285..7353460 100644 --- a/src/lb/tcp/session.hpp +++ b/src/lb/tcp/session.hpp @@ -4,6 +4,7 @@ #include #include #include +#include namespace lb { @@ -17,11 +18,11 @@ struct BasicSession { using SessionPtr = std::shared_ptr; -class HttpSession final : public BasicSession, - public std::enable_shared_from_this { +class HttpSession : public BasicSession, + public std::enable_shared_from_this { public: using IdType = std::size_t; - static constexpr inline std::size_t BUFFER_SIZE = 4096; + static constexpr const std::size_t BUFFER_SIZE = 4096; public: HttpSession(boost::asio::ip::tcp::socket client, boost::asio::ip::tcp::socket server); @@ -36,7 +37,7 @@ class HttpSession final : public BasicSession, void Cancel() override; const IdType& Id() const; -private: +protected: // Client->Server communication callbacks-chain void ClientRead(); @@ -49,9 +50,9 @@ class HttpSession final : public BasicSession, void HandleServerRead(boost::system::error_code ec, std::size_t length); void SendToClient(); void HandleSendToClient(boost::system::error_code ec, std::size_t length); -private: +protected: static IdType generateId(); -private: +protected: boost::asio::ip::tcp::socket client_socket; boost::asio::ip::tcp::socket server_socket; boost::asio::streambuf cb; @@ -62,6 +63,56 @@ class HttpSession final : public BasicSession, boost::mutex mutex; }; + +class LeastConnectionsHttpSession : public BasicSession, + public std::enable_shared_from_this { +public: + using IdType = std::size_t; + static constexpr const std::size_t BUFFER_SIZE = 4096; + using SelectorType = std::shared_ptr; +public: + LeastConnectionsHttpSession(boost::asio::ip::tcp::socket client, + boost::asio::ip::tcp::socket server, + SelectorType selector, Backend server_backend); + + LeastConnectionsHttpSession(const LeastConnectionsHttpSession&) = delete; + LeastConnectionsHttpSession& operator=(const LeastConnectionsHttpSession&) = delete; + ~LeastConnectionsHttpSession() noexcept; + + void Run() override; + + // Cancel all unfinished async operartions on boths sockets + void Cancel() override; + + const IdType& Id() const; +protected: + + // Client->Server communication callbacks-chain + void ClientRead(); + void HandleClientRead(boost::system::error_code ec, std::size_t length); + void SendToServer(); + void HandleSendToServer(boost::system::error_code ec, std::size_t length); + + // Server->Client communication callbacks-chain + void ServerRead(); + void HandleServerRead(boost::system::error_code ec, std::size_t length); + void SendToClient(); + void HandleSendToClient(boost::system::error_code ec, std::size_t length); +protected: + static IdType generateId(); +protected: + boost::asio::ip::tcp::socket client_socket; + boost::asio::ip::tcp::socket server_socket; + boost::asio::streambuf cb; + boost::asio::streambuf sb; + IdType id; + boost::beast::http::request cr; + boost::beast::http::response sr; + boost::mutex mutex; + SelectorType lc_selector; + Backend server_backend; +}; + } // namespace tcp diff --git a/src/lb/url.cpp b/src/lb/url.cpp index 1d599f6..253e082 100644 --- a/src/lb/url.cpp +++ b/src/lb/url.cpp @@ -80,8 +80,7 @@ namespace lb bool operator==(const Url &lhs, const Url &rhs) { - return std::tie(lhs.Protocol(), lhs.Hostname(), lhs.Port(), lhs.Path(), lhs.Query(), lhs.Fragment()) - == std::tie(rhs.Protocol(), rhs.Hostname(), rhs.Port(), rhs.Path(), rhs.Query(), rhs.Fragment()); + return lhs.ToString() == rhs.ToString(); } } // namespace lb \ No newline at end of file diff --git a/tests/learn_boost_heap.cpp b/tests/learn_boost_heap.cpp new file mode 100644 index 0000000..042a401 --- /dev/null +++ b/tests/learn_boost_heap.cpp @@ -0,0 +1,39 @@ +#include +#include +#include +#include + +struct CounterWrapper { + lb::tcp::Backend* b; + std::size_t counter = 0; + std::size_t id = 0; +}; + +struct ConnectionsCompare { + bool operator()(const CounterWrapper& lhs, const CounterWrapper& rhs) const + { + return lhs.counter > rhs.counter; + } +}; + +TEST(PairingHeap, learnHowItWorks) +{ + boost::heap::pairing_heap> heap; + + auto handle1 = heap.push({nullptr, 0, 1}); + ASSERT_TRUE(heap.top().counter == 0); + ASSERT_TRUE(heap.top().id == 1); + + auto handle2 = heap.push({nullptr, 0, 2}); + ASSERT_TRUE(heap.top().counter == 0); + ASSERT_TRUE(heap.top().id == 1); + + (*handle1).counter++; + heap.increase(handle1); + ASSERT_TRUE(heap.top().counter == 0); + ASSERT_TRUE(heap.top().id == 2); + + heap.erase(handle2); + heap.erase(handle1); + ASSERT_TRUE(heap.empty()); +} \ No newline at end of file diff --git a/tests/test_selectors.cpp b/tests/test_selectors.cpp index c9d14f2..6eeadc3 100644 --- a/tests/test_selectors.cpp +++ b/tests/test_selectors.cpp @@ -269,4 +269,52 @@ R"(algorithm: weighted_round_robin lb::tcp::WeightedRoundRobinSelector selector; ASSERT_THROW( selector.Configure(selector_config), std::runtime_error); // No weights +} + +TEST(LeastConnections, basicUsage) +{ + YAML::Node selector_config = YAML::Load( +R"(algorithm: least_connections +endpoints: + - ip: "127.0.0.1" + port: 8081 + - ip: "127.0.0.2" + port: 8082 + - ip: "127.0.0.3" + port: 8083 +)"); + + lb::tcp::LeastConnectionsSelector selector; + selector.Configure(selector_config); + boost::asio::ip::tcp::endpoint notused(boost::asio::ip::address::from_string("127.0.0.1"), 8080); + auto b1 = lb::tcp::Backend("127.0.0.1", 8081); + auto b2 = lb::tcp::Backend("127.0.0.2", 8082); + auto b3 = lb::tcp::Backend("127.0.0.3", 8083); + + ASSERT_EQ(selector.SelectBackend(notused), b1); + ASSERT_EQ(selector.SelectBackend(notused), b3); + ASSERT_EQ(selector.SelectBackend(notused), b2); + + ASSERT_EQ(selector.SelectBackend(notused), b2); + ASSERT_EQ(selector.SelectBackend(notused), b3); + ASSERT_EQ(selector.SelectBackend(notused), b1); + + + selector.DecreaseConnectionCount(b1); + selector.DecreaseConnectionCount(b1); + + ASSERT_EQ(selector.SelectBackend(notused), b1); + ASSERT_EQ(selector.SelectBackend(notused), b1); + + selector.DecreaseConnectionCount(b2); + selector.DecreaseConnectionCount(b2); + + ASSERT_EQ(selector.SelectBackend(notused), b2); + ASSERT_EQ(selector.SelectBackend(notused), b2); + + selector.DecreaseConnectionCount(b3); + selector.DecreaseConnectionCount(b3); + + ASSERT_EQ(selector.SelectBackend(notused), b3); + ASSERT_EQ(selector.SelectBackend(notused), b3); } \ No newline at end of file