Skip to content

Commit

Permalink
refactor:Replacement network library (#349)
Browse files Browse the repository at this point in the history
* Replacement network library

Signed-off-by: HappyUncle <code4happy@gmail.com>
Co-authored-by: HappyUncle <code4happy@gmail.com>
  • Loading branch information
lqxhub and happy-v587 authored Aug 5, 2024
1 parent f2eaecf commit d7afaa5
Show file tree
Hide file tree
Showing 73 changed files with 3,021 additions and 4,115 deletions.
1 change: 0 additions & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,6 @@ INCLUDE(cmake/gflags.cmake)
INCLUDE(cmake/snappy.cmake)
INCLUDE(cmake/zlib.cmake)
INCLUDE(cmake/leveldb.cmake)
INCLUDE(cmake/libevent.cmake)
INCLUDE(cmake/llhttp.cmake)
INCLUDE(cmake/fmt.cmake)
INCLUDE(cmake/spdlog.cmake)
Expand Down
22 changes: 0 additions & 22 deletions cmake/libevent.cmake

This file was deleted.

191 changes: 68 additions & 123 deletions src/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -260,11 +260,11 @@ static int ProcessMaster(const char* start, const char* end) {
}

int PClient::handlePacket(const char* start, int bytes) {
auto conn = getTcpConnection();
if (!conn) {
ERROR("BUG: conn can't be null when recv data");
return -1;
}
// auto conn = getTcpConnection();
// if (!conn) {
// ERROR("BUG: conn can't be null when recv data");
// return -1;
// }

s_current = this;

Expand All @@ -291,7 +291,7 @@ int PClient::handlePacket(const char* start, int bytes) {
auto parseRet = parser_.ParseRequest(ptr, end);
if (parseRet == PParseResult::kError) {
if (!parser_.IsInitialState()) {
conn->ActiveClose();
// g_pikiwidb->closeClient(this);
return 0;
}

Expand Down Expand Up @@ -331,18 +331,19 @@ int PClient::handlePacket(const char* start, int bytes) {
auto now = ::time(nullptr);
if (now <= last_auth_ + 1) {
// avoid guess password.
conn->ActiveClose();
g_pikiwidb->CloseConnection(shared_from_this());
return 0;
} else {
last_auth_ = now;
}
} else {
SetLineString("-NOAUTH Authentication required.");
SendPacket();
return static_cast<int>(ptr - start);
}
}

DEBUG("client {}, cmd {}", conn->GetUniqueId(), cmdName_);
// DEBUG("client {}, cmd {}", conn->GetUniqueId(), cmdName_);

FeedMonitors(params_);

Expand Down Expand Up @@ -420,33 +421,11 @@ void PClient::executeCommand() {

PClient* PClient::Current() { return s_current; }

PClient::PClient(TcpConnection* obj)
: tcp_connection_(std::static_pointer_cast<TcpConnection>(obj->shared_from_this())),
dbno_(0),
flag_(0),
name_("clientxxx"),
parser_(params_) {
PClient::PClient() : parser_(params_) {
auth_ = false;
reset();
}

int PClient::HandlePackets(pikiwidb::TcpConnection* obj, const char* start, int size) {
int total = 0;
while (total < size) {
auto processed = handlePacket(start + total, size - total);
if (processed <= 0) {
break;
}

total += processed;
}

// obj->SendPacket(Message());
// Clear();
// reply_.Clear();
return total;
}

void PClient::OnConnect() {
SetState(ClientState::kOK);
if (isPeerMaster()) {
Expand All @@ -470,69 +449,45 @@ void PClient::OnConnect() {
}
}

const std::string& PClient::PeerIP() const {
if (auto c = getTcpConnection(); c) {
return c->GetPeerIP();
std::string PClient::PeerIP() const {
if (!addr_.IsValid()) {
return "";
}

static const std::string kEmpty;
return kEmpty;
return addr_.GetIP();
}

int PClient::PeerPort() const {
if (auto c = getTcpConnection(); c) {
return c->GetPeerPort();
}

return -1;
}

bool PClient::SendPacket(const std::string& buf) {
if (auto c = getTcpConnection(); c) {
return c->SendPacket(buf);
if (!addr_.IsValid()) {
return 0;
}

return false;
return addr_.GetPort();
}

bool PClient::SendPacket(const void* data, size_t size) {
if (auto c = getTcpConnection(); c) {
return c->SendPacket(data, size);
}

return false;
bool PClient::SendPacket() {
std::string str;
message_.swap(str);
g_pikiwidb->SendPacket2Client(shared_from_this(), std::move(str));
SendOver();
return true;
}
bool PClient::SendPacket(UnboundedBuffer& data) {
if (auto c = getTcpConnection(); c) {
return c->SendPacket(data);
}

return false;
bool PClient::SendPacket(std::string&& msg) {
g_pikiwidb->SendPacket2Client(shared_from_this(), std::move(msg));
SendOver();
return true;
}

bool PClient::SendPacket(const evbuffer_iovec* iovecs, size_t nvecs) {
if (auto c = getTcpConnection(); c) {
return c->SendPacket(iovecs, nvecs);
}

return false;
bool PClient::SendPacket(UnboundedBuffer& data) {
g_pikiwidb->SendPacket2Client(shared_from_this(), std::move(data.ToString()));
SendOver();
return true;
}

void PClient::WriteReply2Client() {
if (auto c = getTcpConnection(); c) {
c->SendPacket(Message());
}
Clear();
reset();
}
void PClient::Close() { g_pikiwidb->CloseConnection(shared_from_this()); }

void PClient::Close() {
void PClient::OnClose() {
SetState(ClientState::kClosed);
reset();
if (auto c = getTcpConnection(); c) {
c->ActiveClose();
tcp_connection_.reset();
}
}

void PClient::reset() {
Expand All @@ -549,13 +504,7 @@ bool PClient::isClusterCmdTarget() const {
return PRAFT.GetClusterCmdCtx().GetPeerIp() == PeerIP() && PRAFT.GetClusterCmdCtx().GetPort() == PeerPort();
}

int PClient::uniqueID() const {
if (auto c = getTcpConnection(); c) {
return c->GetUniqueId();
}

return -1;
}
uint64_t PClient::uniqueID() const { return GetConnId(); }

bool PClient::Watch(int dbno, const std::string& key) {
DEBUG("Client {} watch {}, db {}", name_, key, dbno);
Expand Down Expand Up @@ -641,29 +590,29 @@ void PClient::SetSlaveInfo() { slave_info_ = std::make_unique<PSlaveInfo>(); }

void PClient::TransferToSlaveThreads() {
// transfer to slave
auto tcp_connection = getTcpConnection();
if (!tcp_connection) {
return;
}
// auto tcp_connection = getTcpConnection();
// if (!tcp_connection) {
// return;
// }

auto loop = tcp_connection->GetEventLoop();
auto loop_name = loop->GetName();
if (loop_name.find("slave") == std::string::npos) {
auto slave_loop = tcp_connection->SelectSlaveEventLoop();
auto id = tcp_connection->GetUniqueId();
auto event_object = loop->GetEventObject(id);
auto del_conn = [loop, slave_loop, event_object]() {
loop->Unregister(event_object);
event_object->SetUniqueId(-1);
auto tcp_connection = std::dynamic_pointer_cast<TcpConnection>(event_object);
assert(tcp_connection);
tcp_connection->ResetEventLoop(slave_loop);

auto add_conn = [slave_loop, event_object]() { slave_loop->Register(event_object, 0); };
slave_loop->Execute(std::move(add_conn));
};
loop->Execute(std::move(del_conn));
}
// auto loop = tcp_connection->GetEventLoop();
// auto loop_name = loop->GetName();
// if (loop_name.find("slave") == std::string::npos) {
// auto slave_loop = tcp_connection->SelectSlaveEventLoop();
// auto id = tcp_connection->GetUniqueId();
// auto event_object = loop->GetEventObject(id);
// auto del_conn = [loop, slave_loop, event_object]() {
// loop->Unregister(event_object);
// event_object->SetUniqueId(-1);
// auto tcp_connection = std::dynamic_pointer_cast<TcpConnection>(event_object);
// assert(tcp_connection);
// tcp_connection->ResetEventLoop(slave_loop);
//
// auto add_conn = [slave_loop, event_object]() { slave_loop->Register(event_object, 0); };
// slave_loop->Execute(std::move(add_conn));
// };
// loop->Execute(std::move(del_conn));
// }
}

void PClient::AddCurrentToMonitor() {
Expand All @@ -681,34 +630,30 @@ void PClient::FeedMonitors(const std::vector<std::string>& params) {
}
}

char buf[512];
int n = snprintf(buf, sizeof buf, "+[db%d %s:%d]: \"", s_current->GetCurrentDB(), s_current->PeerIP().c_str(),
s_current->PeerPort());

assert(n > 0);
fmt::memory_buffer buf;
fmt::format_to(std::back_inserter(buf), "+[db{} {}:{}]: \"", s_current->GetCurrentDB(), s_current->PeerIP(),
s_current->PeerPort());

for (const auto& e : params) {
if (n < static_cast<int>(sizeof buf)) {
n += snprintf(buf + n, sizeof buf - n, "%s ", e.data());
} else {
break;
}
fmt::format_to(std::back_inserter(buf), "{} ", e);
}

--n; // no space follow last param
// remove the last space
if (!params.empty() && buf.size() > 0) {
buf.resize(buf.size() - 1);
}

{
std::unique_lock<std::mutex> guard(monitors_mutex);

for (auto it(monitors.begin()); it != monitors.end();) {
for (auto it = monitors.begin(); it != monitors.end();) {
auto m = it->lock();
if (m) {
m->SendPacket(buf, n);
m->SendPacket("\"" CRLF, 3);

fmt::format_to(std::back_inserter(buf), "\"\r\n");
m->SendPacket(fmt::to_string(buf));
++it;
} else {
monitors.erase(it++);
it = monitors.erase(it);
}
}
}
Expand Down
Loading

0 comments on commit d7afaa5

Please sign in to comment.