Skip to content

Commit

Permalink
Feat: Implemented Least-connection algorithm
Browse files Browse the repository at this point in the history
  • Loading branch information
bersen66 committed Apr 29, 2024
1 parent 2829806 commit 35aaf73
Show file tree
Hide file tree
Showing 10 changed files with 468 additions and 25 deletions.
8 changes: 4 additions & 4 deletions configs/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@ acceptor:
ip_version: 4

load_balancing:
algorithm: ip_hash
algorithm: least_connections
endpoints:
- ip: "127.0.0.1"
port: 8001
port: 8081
- ip: "127.0.0.2"
port: 8002
port: 8082
- ip: "127.0.0.3"
port: 8003
port: 8083
27 changes: 19 additions & 8 deletions src/lb/tcp/connector.cpp
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
#include <lb/logging.hpp>
#include <lb/tcp/connector.hpp>
#include <lb/tcp/session.hpp>
#include "connector.hpp"

namespace lb::tcp {

Expand All @@ -11,6 +10,16 @@ Connector::Connector(boost::asio::io_context& ctx, SelectorPtr selector)
, selector(selector)
{}

SessionPtr MakeSession(SelectorPtr& selector, boost::asio::ip::tcp::socket client_socket, boost::asio::ip::tcp::socket server_socket, Backend backend)
{
if (selector->Type() == SelectorType::LEAST_CONNECTIONS) {
std::shared_ptr<LeastConnectionsSelector> lc_selector = std::dynamic_pointer_cast<LeastConnectionsSelector>(selector);
return std::make_shared<LeastConnectionsHttpSession>(std::move(client_socket), std::move(server_socket), lc_selector, backend);
}
return std::make_shared<HttpSession>(std::move(client_socket), std::move(server_socket));
}


void Connector::MakeAndRunSession(boost::asio::ip::tcp::socket client_socket)
{
// TODO: selection of backend
Expand All @@ -28,13 +37,13 @@ void Connector::MakeAndRunSession(boost::asio::ip::tcp::socket client_socket)
if (error) {
ERROR("{}", error.message());
if (error == boost::asio::error::connection_refused) {
selector->ExcludeBackend(backend);
selector->ExcludeBackend(std::move(backend));
MakeAndRunSession(std::move(client_socket));
}
return;
}
auto connection = std::make_shared<HttpSession>(std::move(client_socket), std::move(*server_socket));
connection->Run();
SessionPtr session = MakeSession(selector, std::move(client_socket), std::move(*server_socket), std::move(backend));
session->Run();
});
} else if (backend.IsUrl()) {
DEBUG("Is url");
Expand All @@ -45,7 +54,8 @@ void Connector::MakeAndRunSession(boost::asio::ip::tcp::socket client_socket)
ResolverQuery resolve_query{url.Hostname(), url.Port()};
resolver.async_resolve(
resolve_query,
[this, server_socket, client_socket=std::move(client_socket), backend=std::move(backend)](const boost::system::error_code& error, ResolverResults resolver_results) mutable
[this, server_socket, client_socket=std::move(client_socket), backend=std::move(backend)]
(const boost::system::error_code& error, ResolverResults resolver_results) mutable
{
if (error) {
ERROR("Resolve error: {}", error.message());
Expand All @@ -59,14 +69,15 @@ void Connector::MakeAndRunSession(boost::asio::ip::tcp::socket client_socket)
boost::asio::async_connect(
*server_socket,
resolver_results,
[this, server_socket, client_socket=std::move(client_socket)] (const boost::system::error_code& error, const boost::asio::ip::tcp::endpoint& endpoint) mutable
[this, server_socket, client_socket=std::move(client_socket), backend=std::move(backend)]
(const boost::system::error_code& error, const boost::asio::ip::tcp::endpoint& endpoint) mutable
{
if (error) {
ERROR("{}", error.message());
return;
}
auto connection = std::make_shared<HttpSession>(std::move(client_socket), std::move(*server_socket));
connection->Run();
SessionPtr session = MakeSession(selector, std::move(client_socket), std::move(*server_socket), std::move(backend));
session->Run();
});
});
}
Expand Down
1 change: 1 addition & 0 deletions src/lb/tcp/connector.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include <boost/asio.hpp>
#include <lb/tcp/selectors.hpp>
#include <lb/tcp/session.hpp>

#include <memory>

Expand Down
103 changes: 101 additions & 2 deletions src/lb/tcp/selectors.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,9 @@ SelectorPtr DetectSelector(const YAML::Node& node)
{"round_robin", SelectorType::ROUND_ROBIN},
{"weighted_round_robin", SelectorType::WEIGHTED_ROUND_ROBIN},
{"consistent_hash", SelectorType::CONSISTENT_HASH},
{"ip_hash", SelectorType::IP_HASH}
{"ip_hash", SelectorType::IP_HASH},
{"least_connections", SelectorType::LEAST_CONNECTIONS},

};

if (!balancing_node["algorithm"].IsDefined()) {
Expand Down Expand Up @@ -127,7 +129,12 @@ SelectorPtr DetectSelector(const YAML::Node& node)
ch->Configure(balancing_node);
return ch;
} break;

case SelectorType::LEAST_CONNECTIONS: {
SelectorPtr lc = std::make_shared<LeastConnectionsSelector>();
lc->Configure(balancing_node);
return lc;
}
break;
default: {
STACKTRACE("Selector {} is not implemented", name);
}
Expand Down Expand Up @@ -433,4 +440,96 @@ SelectorType ConsistentHashSelector::Type() const
return SelectorType::CONSISTENT_HASH;
}

// ============================ LeastConnectionsSelector ============================

void LeastConnectionsSelector::Configure(const YAML::Node& config) {
if (!config["endpoints"].IsDefined()) {
STACKTRACE("Least connections endpoints node is missed");
}
const YAML::Node& ep_node = config["endpoints"];
if (!ep_node.IsSequence()) {
EXCEPTION("endpoints node must be a sequence");
}

for (const YAML::Node& ep : ep_node) {
if (ep["url"].IsDefined()) {
InsertBackend(Backend(ep["url"].as<std::string>()));
continue;
}

if (!ep["ip"].IsDefined()) {
STACKTRACE("{} missed {} field", ep, "ip");
}
if (!ep["port"].IsDefined()) {
STACKTRACE("{} missed {} field", ep, "port");
}

InsertBackend(Backend(ep["ip"].as<std::string>(), ep["port"].as<int>()));
}
}


void LeastConnectionsSelector::InsertBackend(Backend&& b)
{
boost::recursive_mutex::scoped_lock lock(mutex_);
HandleType handle = backends_.push(CounterWrapper{.b = std::move(b),
.counter = 0});
handle_pool_[(*handle).b.ToString()] = std::move(handle);
}


Backend LeastConnectionsSelector::SelectBackend(const boost::asio::ip::tcp::endpoint& notused)
{
boost::recursive_mutex::scoped_lock lock(mutex_);
Backend result = backends_.top().b;
IncreaseConnectionCount(backends_.top().b);
return result;
}

void LeastConnectionsSelector::ExcludeBackend(const Backend& backend)
{
boost::recursive_mutex::scoped_lock lock(mutex_);

const std::string& str = backend.ToString();
if (auto it = handle_pool_.find(str); it != handle_pool_.end()) {
HandleType& handle = it->second;

backends_.erase(handle);
handle_pool_.erase(str);

if (backends_.empty()) {
EXCEPTION("All backends are excluded!");
}
}
}

SelectorType LeastConnectionsSelector::Type() const
{
return SelectorType::LEAST_CONNECTIONS;
}

void LeastConnectionsSelector::IncreaseConnectionCount(const Backend& backend)
{
boost::recursive_mutex::scoped_lock lock(mutex_);
const std::string& str = backend.ToString();
if (auto it = handle_pool_.find(str); it != handle_pool_.end()) {
HandleType& handle = it->second;
(*handle).counter++;
backends_.decrease(handle);
}
}

void LeastConnectionsSelector::DecreaseConnectionCount(const Backend& backend)
{
boost::recursive_mutex::scoped_lock lock(mutex_);
const std::string& str = backend.ToString();
if (auto it = handle_pool_.find(str); it != handle_pool_.end()) {
HandleType& handle = it->second;
if ((*handle).counter > 0) {
(*handle).counter--;
backends_.decrease(handle);
}
}
}

} // namespace lb::tcp
54 changes: 51 additions & 3 deletions src/lb/tcp/selectors.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,15 @@
#include <memory>
#include <ostream>
#include <variant>
#include <unordered_map>

#include <yaml-cpp/yaml.h>
#include <boost/asio/ip/tcp.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/thread/recursive_mutex.hpp>
#include <pumba/pumba.hpp> // for pumba::ConsistentHashingRouter
#include <boost/heap/pairing_heap.hpp> // for boost::heap::pairing_heap
#include <lb/url.hpp>
#include <pumba/pumba.hpp>


namespace lb::tcp {

Expand Down Expand Up @@ -38,13 +41,22 @@ class Backend {
std::variant<EndpointType, UrlType> value;
};


struct BackendHasher {
std::size_t operator()(const lb::tcp::Backend& backend) const {
static std::hash<std::string> hash{};
return hash(backend.ToString());
}
};

std::ostream& operator<<(std::ostream& out, const Backend& backend);

enum class SelectorType {
ROUND_ROBIN=0,
WEIGHTED_ROUND_ROBIN,
IP_HASH,
CONSISTENT_HASH,
LEAST_CONNECTIONS,
};

struct ISelector {
Expand Down Expand Up @@ -146,4 +158,40 @@ class ConsistentHashSelector final : public ISelector {
};


} // namespace lb::tcp

class LeastConnectionsSelector final : public ISelector {
public:
void Configure(const YAML::Node& config) override;
Backend SelectBackend(const boost::asio::ip::tcp::endpoint& client_address) override;
void ExcludeBackend(const Backend& backend) override;
SelectorType Type() const override;
void IncreaseConnectionCount(const Backend& backend);
void DecreaseConnectionCount(const Backend& backend);
private:

struct CounterWrapper {
lb::tcp::Backend b;
std::size_t counter = 0;
};

struct ConnectionsCompare {
bool operator()(const CounterWrapper& lhs, const CounterWrapper& rhs) const
{
return lhs.counter > rhs.counter;
}
};

using PairingMap = boost::heap::pairing_heap<CounterWrapper, boost::heap::compare<ConnectionsCompare>>;
using HandleType = PairingMap::handle_type;

private:

void InsertBackend(Backend&& b);

private:
boost::recursive_mutex mutex_;
std::unordered_map<std::string, HandleType> handle_pool_;
PairingMap backends_;
};

} // namespace lb::tcp
Loading

0 comments on commit 35aaf73

Please sign in to comment.