Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: use RTC(Run-to-completion) model to speed up cache read #2837

Merged
merged 3 commits into from
Jul 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions include/pika_client_conn.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ struct TimeStat {
void Reset() {
enqueue_ts_ = dequeue_ts_ = 0;
process_done_ts_ = 0;
before_queue_ts_ = 0;
}

uint64_t start_ts() const {
Expand All @@ -37,8 +38,13 @@ struct TimeStat {
return process_done_ts_ > dequeue_ts_ ? process_done_ts_ - dequeue_ts_ : 0;
}

uint64_t before_queue_time() const {
return process_done_ts_ > dequeue_ts_ ? before_queue_ts_ - enqueue_ts_ : 0;
}

uint64_t enqueue_ts_;
uint64_t dequeue_ts_;
uint64_t before_queue_ts_;
uint64_t process_done_ts_;
};

Expand Down Expand Up @@ -67,8 +73,11 @@ class PikaClientConn : public net::RedisConn {
const net::HandleType& handle_type, int max_conn_rbuf_size);
~PikaClientConn() = default;

bool IsInterceptedByRTC(std::string& opt);

void ProcessRedisCmds(const std::vector<net::RedisCmdArgsType>& argvs, bool async, std::string* response) override;

bool ReadCmdInCache(const net::RedisCmdArgsType& argv, const std::string& opt);
void BatchExecRedisCmd(const std::vector<net::RedisCmdArgsType>& argvs);
int DealMessage(const net::RedisCmdArgsType& argv, std::string* response) override { return 0; }
static void DoBackgroundTask(void* arg);
Expand Down
5 changes: 4 additions & 1 deletion include/pika_command.h
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,7 @@ const std::string kCmdNameXInfo = "xinfo";

const std::string kClusterPrefix = "pkcluster";


/*
* If a type holds a key, a new data structure
* that uses the key will use this error
Expand Down Expand Up @@ -289,7 +290,7 @@ enum CmdFlags {
kCmdFlagsOperateKey = (1 << 19), // redis keySpace
kCmdFlagsStream = (1 << 20),
kCmdFlagsFast = (1 << 21),
kCmdFlagsSlow = (1 << 22),
kCmdFlagsSlow = (1 << 22)
};

void inline RedisAppendContent(std::string& str, const std::string& value);
Expand Down Expand Up @@ -536,6 +537,7 @@ class Cmd : public std::enable_shared_from_this<Cmd> {
bool hasFlag(uint32_t flag) const;
bool is_read() const;
bool is_write() const;
bool isCacheRead() const;

bool IsLocal() const;
bool IsSuspend() const;
Expand Down Expand Up @@ -579,6 +581,7 @@ class Cmd : public std::enable_shared_from_this<Cmd> {
void ProcessCommand(const HintKeys& hint_key = HintKeys());
void InternalProcessCommand(const HintKeys& hint_key);
void DoCommand(const HintKeys& hint_key);
bool DoReadCommandInCache();
void LogCommand() const;

std::string name_;
Expand Down
73 changes: 72 additions & 1 deletion src/pika_client_conn.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "include/pika_server.h"
#include "net/src/dispatch_thread.h"
#include "net/src/worker_thread.h"
#include "src/pstd/include/scope_record_lock.h"

extern std::unique_ptr<PikaConf> g_pika_conf;
extern PikaServer* g_pika_server;
Expand Down Expand Up @@ -237,6 +238,7 @@ void PikaClientConn::ProcessSlowlog(const PikaCmdArgsType& argv, uint64_t do_dur
LOG(ERROR) << "ip_port: " << ip_port() << ", db: " << current_db_ << ", command:" << slow_log
<< ", command_size: " << cmd_size - 1 << ", arguments: " << argv.size()
<< ", total_time(ms): " << time_stat_->total_time() / 1000
<< ", before_queue_time(ms): " << time_stat_->before_queue_time() / 1000
<< ", queue_time(ms): " << time_stat_->queue_time() / 1000
<< ", process_time(ms): " << time_stat_->process_time() / 1000
<< ", cmd_time(ms): " << do_duration / 1000;
Expand All @@ -255,13 +257,24 @@ void PikaClientConn::ProcessMonitor(const PikaCmdArgsType& argv) {
g_pika_server->AddMonitorMessage(monitor_message);
}

bool PikaClientConn::IsInterceptedByRTC(std::string& opt) {
//currently we only Intercept: Get, HGet
if (opt == kCmdNameGet && g_pika_conf->GetCacheString()) {
return true;
}
if (opt == kCmdNameHGet && g_pika_conf->GetCacheHash()) {
return true;
}
return false;
}

void PikaClientConn::ProcessRedisCmds(const std::vector<net::RedisCmdArgsType>& argvs, bool async,
std::string* response) {
time_stat_->Reset();
if (async) {
auto arg = new BgTaskArg();
arg->redis_cmds = argvs;
time_stat_->enqueue_ts_ = pstd::NowMicros();
time_stat_->enqueue_ts_ = time_stat_->before_queue_ts_ = pstd::NowMicros();
arg->conn_ptr = std::dynamic_pointer_cast<PikaClientConn>(shared_from_this());
/**
* If using the pipeline method to transmit batch commands to Pika, it is unable to
Expand All @@ -273,6 +286,19 @@ void PikaClientConn::ProcessRedisCmds(const std::vector<net::RedisCmdArgsType>&
pstd::StringToLower(opt);
bool is_slow_cmd = g_pika_conf->is_slow_cmd(opt);
bool is_admin_cmd = g_pika_conf->is_admin_cmd(opt);

//we don't intercept pipeline batch (argvs.size() > 1)
if (argvs.size() == 1 && IsInterceptedByRTC(opt) &&
PIKA_CACHE_NONE != g_pika_conf->cache_mode() &&
!IsInTxn()) {
// read in cache
if (ReadCmdInCache(argvs[0], opt)) {
delete arg;
return;
}
time_stat_->before_queue_ts_ = pstd::NowMicros();
}

g_pika_server->ScheduleClientPool(&DoBackgroundTask, arg, is_slow_cmd, is_admin_cmd);
return;
}
Expand Down Expand Up @@ -308,6 +334,51 @@ void PikaClientConn::BatchExecRedisCmd(const std::vector<net::RedisCmdArgsType>&
TryWriteResp();
}

bool PikaClientConn::ReadCmdInCache(const net::RedisCmdArgsType& argv, const std::string& opt) {
resp_num.store(1);
std::shared_ptr<Cmd> c_ptr = g_pika_cmd_table_manager->GetCmd(opt);
if (!c_ptr) {
return false;
}
// Check authed
if (AuthRequired()) { // the user is not authed, need to do auth
if (!(c_ptr->flag() & kCmdFlagsNoAuth)) {
return false;
}
}
// Initial
c_ptr->Initial(argv, current_db_);
//acl check
int8_t subCmdIndex = -1;
std::string errKey;
auto checkRes = user_->CheckUserPermission(c_ptr, argv, subCmdIndex, &errKey);
std::string object;
if (checkRes == AclDeniedCmd::CMD ||
checkRes == AclDeniedCmd::KEY ||
checkRes == AclDeniedCmd::CHANNEL ||
checkRes == AclDeniedCmd::NO_SUB_CMD ||
checkRes == AclDeniedCmd::NO_AUTH
) {
//acl check failed
return false;
}
//only read command(Get, HGet) will reach here, no need of record lock
if (c_ptr->db_->cache()->CacheStatus() != PIKA_CACHE_STATUS_OK) {
return false;
}
bool read_status = c_ptr->DoReadCommandInCache();
auto cmdstat_map = g_pika_cmd_table_manager->GetCommandStatMap();
resp_num--;
if (read_status) {
time_stat_->process_done_ts_ = pstd::NowMicros();
(*cmdstat_map)[argv[0]].cmd_count.fetch_add(1);
(*cmdstat_map)[argv[0]].cmd_time_consuming.fetch_add(time_stat_->total_time());
resp_array.emplace_back(std::make_shared<std::string>(std::move(c_ptr->res().message())));
TryWriteResp();
}
return read_status;
}

void PikaClientConn::TryWriteResp() {
int expected = 0;
if (resp_num.compare_exchange_strong(expected, -1)) {
Expand Down
55 changes: 39 additions & 16 deletions src/pika_command.cc
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ void InitCmdTable(CmdTable* cmd_table) {
cmd_table->insert(std::pair<std::string, std::unique_ptr<Cmd>>(kCmdNameSet, std::move(setptr)));
////GetCmd
std::unique_ptr<Cmd> getptr =
std::make_unique<GetCmd>(kCmdNameGet, 2, kCmdFlagsRead | kCmdFlagsKv | kCmdFlagsDoThroughDB | kCmdFlagsUpdateCache | kCmdFlagsReadCache | kCmdFlagsSlow);
std::make_unique<GetCmd>(kCmdNameGet, 2, kCmdFlagsRead | kCmdFlagsKv | kCmdFlagsDoThroughDB | kCmdFlagsUpdateCache | kCmdFlagsReadCache | kCmdFlagsSlow);
cmd_table->insert(std::pair<std::string, std::unique_ptr<Cmd>>(kCmdNameGet, std::move(getptr)));
////DelCmd
std::unique_ptr<Cmd> delptr =
Expand Down Expand Up @@ -392,15 +392,15 @@ void InitCmdTable(CmdTable* cmd_table) {
cmd_table->insert(std::pair<std::string, std::unique_ptr<Cmd>>(kCmdNameHSet, std::move(hsetptr)));
////HGetCmd
std::unique_ptr<Cmd> hgetptr =
std::make_unique<HGetCmd>(kCmdNameHGet, 3, kCmdFlagsRead | kCmdFlagsHash | kCmdFlagsUpdateCache | kCmdFlagsDoThroughDB | kCmdFlagsReadCache | kCmdFlagsFast);
std::make_unique<HGetCmd>(kCmdNameHGet, 3, kCmdFlagsRead | kCmdFlagsHash | kCmdFlagsUpdateCache | kCmdFlagsDoThroughDB | kCmdFlagsReadCache |kCmdFlagsFast);
cmd_table->insert(std::pair<std::string, std::unique_ptr<Cmd>>(kCmdNameHGet, std::move(hgetptr)));
////HGetallCmd
std::unique_ptr<Cmd> hgetallptr =
std::make_unique<HGetallCmd>(kCmdNameHGetall, 2, kCmdFlagsRead | kCmdFlagsHash | kCmdFlagsSlow | kCmdFlagsUpdateCache | kCmdFlagsDoThroughDB | kCmdFlagsReadCache);
cmd_table->insert(std::pair<std::string, std::unique_ptr<Cmd>>(kCmdNameHGetall, std::move(hgetallptr)));
////HExistsCmd
std::unique_ptr<Cmd> hexistsptr =
std::make_unique<HExistsCmd>(kCmdNameHExists, 3, kCmdFlagsRead | kCmdFlagsHash | kCmdFlagsUpdateCache | kCmdFlagsDoThroughDB | kCmdFlagsReadCache | kCmdFlagsFast);
std::make_unique<HExistsCmd>(kCmdNameHExists, 3, kCmdFlagsRead | kCmdFlagsHash | kCmdFlagsUpdateCache | kCmdFlagsDoThroughDB | kCmdFlagsReadCache | kCmdFlagsFast );
cmd_table->insert(std::pair<std::string, std::unique_ptr<Cmd>>(kCmdNameHExists, std::move(hexistsptr)));
////HIncrbyCmd
std::unique_ptr<Cmd> hincrbyptr =
Expand All @@ -420,7 +420,7 @@ void InitCmdTable(CmdTable* cmd_table) {
cmd_table->insert(std::pair<std::string, std::unique_ptr<Cmd>>(kCmdNameHLen, std::move(hlenptr)));
////HMgetCmd
std::unique_ptr<Cmd> hmgetptr =
std::make_unique<HMgetCmd>(kCmdNameHMget, -3, kCmdFlagsRead | kCmdFlagsHash | kCmdFlagsUpdateCache | kCmdFlagsDoThroughDB | kCmdFlagsReadCache | kCmdFlagsFast);
std::make_unique<HMgetCmd>(kCmdNameHMget, -3, kCmdFlagsRead | kCmdFlagsHash | kCmdFlagsUpdateCache | kCmdFlagsDoThroughDB | kCmdFlagsReadCache |kCmdFlagsFast);
cmd_table->insert(std::pair<std::string, std::unique_ptr<Cmd>>(kCmdNameHMget, std::move(hmgetptr)));
////HMsetCmd
std::unique_ptr<Cmd> hmsetptr =
Expand Down Expand Up @@ -736,50 +736,50 @@ void InitCmdTable(CmdTable* cmd_table) {
// PubSub
////Publish
std::unique_ptr<Cmd> publishptr =
std::make_unique<PublishCmd>(kCmdNamePublish, 3, kCmdFlagsRead | kCmdFlagsPubSub | kCmdFlagsFast);
std::make_unique<PublishCmd>(kCmdNamePublish, 3, kCmdFlagsRead | kCmdFlagsPubSub | kCmdFlagsFast );
cmd_table->insert(std::pair<std::string, std::unique_ptr<Cmd>>(kCmdNamePublish, std::move(publishptr)));
////Subscribe
std::unique_ptr<Cmd> subscribeptr =
std::make_unique<SubscribeCmd>(kCmdNameSubscribe, -2, kCmdFlagsRead | kCmdFlagsPubSub | kCmdFlagsSlow);
std::make_unique<SubscribeCmd>(kCmdNameSubscribe, -2, kCmdFlagsRead | kCmdFlagsPubSub | kCmdFlagsSlow );
cmd_table->insert(std::pair<std::string, std::unique_ptr<Cmd>>(kCmdNameSubscribe, std::move(subscribeptr)));
////UnSubscribe
std::unique_ptr<Cmd> unsubscribeptr =
std::make_unique<UnSubscribeCmd>(kCmdNameUnSubscribe, -1, kCmdFlagsRead | kCmdFlagsPubSub | kCmdFlagsSlow);
std::make_unique<UnSubscribeCmd>(kCmdNameUnSubscribe, -1, kCmdFlagsRead | kCmdFlagsPubSub | kCmdFlagsSlow );
cmd_table->insert(std::pair<std::string, std::unique_ptr<Cmd>>(kCmdNameUnSubscribe, std::move(unsubscribeptr)));
////PSubscribe
std::unique_ptr<Cmd> psubscribeptr =
std::make_unique<PSubscribeCmd>(kCmdNamePSubscribe, -2, kCmdFlagsRead | kCmdFlagsPubSub | kCmdFlagsSlow);
std::make_unique<PSubscribeCmd>(kCmdNamePSubscribe, -2, kCmdFlagsRead | kCmdFlagsPubSub | kCmdFlagsSlow );
cmd_table->insert(std::pair<std::string, std::unique_ptr<Cmd>>(kCmdNamePSubscribe, std::move(psubscribeptr)));
////PUnSubscribe
std::unique_ptr<Cmd> punsubscribeptr =
std::make_unique<PUnSubscribeCmd>(kCmdNamePUnSubscribe, -1, kCmdFlagsRead | kCmdFlagsPubSub | kCmdFlagsSlow);
std::make_unique<PUnSubscribeCmd>(kCmdNamePUnSubscribe, -1, kCmdFlagsRead | kCmdFlagsPubSub | kCmdFlagsSlow );
cmd_table->insert(std::pair<std::string, std::unique_ptr<Cmd>>(kCmdNamePUnSubscribe, std::move(punsubscribeptr)));
////PubSub
std::unique_ptr<Cmd> pubsubptr =
std::make_unique<PubSubCmd>(kCmdNamePubSub, -2, kCmdFlagsRead | kCmdFlagsPubSub | kCmdFlagsSlow);
std::make_unique<PubSubCmd>(kCmdNamePubSub, -2, kCmdFlagsRead | kCmdFlagsPubSub | kCmdFlagsSlow );
cmd_table->insert(std::pair<std::string, std::unique_ptr<Cmd>>(kCmdNamePubSub, std::move(pubsubptr)));

////ACL
std::unique_ptr<Cmd> aclptr = std::make_unique<PikaAclCmd>(KCmdNameAcl, -2, kCmdFlagsAdmin | kCmdFlagsSlow);
std::unique_ptr<Cmd> aclptr = std::make_unique<PikaAclCmd>(KCmdNameAcl, -2, kCmdFlagsAdmin | kCmdFlagsSlow );
cmd_table->insert(std::pair<std::string, std::unique_ptr<Cmd>>(KCmdNameAcl, std::move(aclptr)));

// Transaction
////Multi
std::unique_ptr<Cmd> multiptr =
std::make_unique<MultiCmd>(kCmdNameMulti, 1, kCmdFlagsRead | kCmdFlagsFast);
std::make_unique<MultiCmd>(kCmdNameMulti, 1, kCmdFlagsRead | kCmdFlagsFast );
cmd_table->insert(std::pair<std::string, std::unique_ptr<Cmd>>(kCmdNameMulti, std::move(multiptr)));
////Exec
std::unique_ptr<Cmd> execptr = std::make_unique<ExecCmd>(
kCmdNameExec, 1, kCmdFlagsRead | kCmdFlagsWrite | kCmdFlagsSuspend | kCmdFlagsSlow);
kCmdNameExec, 1, kCmdFlagsRead | kCmdFlagsWrite | kCmdFlagsSuspend | kCmdFlagsSlow );
cmd_table->insert(std::pair<std::string, std::unique_ptr<Cmd>>(kCmdNameExec, std::move(execptr)));
////Discard
std::unique_ptr<Cmd> discardptr = std::make_unique<DiscardCmd>(kCmdNameDiscard, 1, kCmdFlagsRead | kCmdFlagsFast);
std::unique_ptr<Cmd> discardptr = std::make_unique<DiscardCmd>(kCmdNameDiscard, 1, kCmdFlagsRead | kCmdFlagsFast );
cmd_table->insert(std::pair<std::string, std::unique_ptr<Cmd>>(kCmdNameDiscard, std::move(discardptr)));
////Watch
std::unique_ptr<Cmd> watchptr = std::make_unique<WatchCmd>(kCmdNameWatch, -2, kCmdFlagsRead | kCmdFlagsFast);
std::unique_ptr<Cmd> watchptr = std::make_unique<WatchCmd>(kCmdNameWatch, -2, kCmdFlagsRead | kCmdFlagsFast );
cmd_table->insert(std::pair<std::string, std::unique_ptr<Cmd>>(kCmdNameWatch, std::move(watchptr)));
////Unwatch
std::unique_ptr<Cmd> unwatchptr = std::make_unique<UnwatchCmd>(kCmdNameUnWatch, 1, kCmdFlagsRead | kCmdFlagsFast);
std::unique_ptr<Cmd> unwatchptr = std::make_unique<UnwatchCmd>(kCmdNameUnWatch, 1, kCmdFlagsRead | kCmdFlagsFast );
cmd_table->insert(std::pair<std::string, std::unique_ptr<Cmd>>(kCmdNameUnWatch, std::move(unwatchptr)));

// Stream
Expand Down Expand Up @@ -911,6 +911,29 @@ void Cmd::DoCommand(const HintKeys& hint_keys) {
}
}

bool Cmd::DoReadCommandInCache() {
if (!IsSuspend()) {
db_->DBLockShared();
}
DEFER {
if (!IsSuspend()) {
db_->DBUnlockShared();
}
};

if (db_->cache()->CacheStatus() == PIKA_CACHE_STATUS_OK) {
if (IsNeedReadCache()) {
ReadCache();
}
// return true only the read command hit
if (is_read() && !res().CacheMiss()) {
return true;
}
}
return false;
}


void Cmd::DoBinlog() {
if (res().ok() && is_write() && g_pika_conf->write_binlog()) {
std::shared_ptr<net::NetConn> conn_ptr = GetConn();
Expand Down
Loading