Skip to content

Commit

Permalink
feat: use RTC(Run-to-completion) model to speed up cache read (OpenAt…
Browse files Browse the repository at this point in the history
…omFoundation#2837)

Co-authored-by: chenbt <34958405+chenbt-hz@users.noreply.github.com>
  • Loading branch information
2 people authored and brother-jin committed Aug 6, 2024
1 parent aa5ecfe commit c50c6c7
Show file tree
Hide file tree
Showing 4 changed files with 129 additions and 18 deletions.
9 changes: 9 additions & 0 deletions include/pika_client_conn.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ struct TimeStat {
void Reset() {
enqueue_ts_ = dequeue_ts_ = 0;
process_done_ts_ = 0;
before_queue_ts_ = 0;
}

uint64_t start_ts() const {
Expand All @@ -37,8 +38,13 @@ struct TimeStat {
return process_done_ts_ > dequeue_ts_ ? process_done_ts_ - dequeue_ts_ : 0;
}

uint64_t before_queue_time() const {
return process_done_ts_ > dequeue_ts_ ? before_queue_ts_ - enqueue_ts_ : 0;
}

uint64_t enqueue_ts_;
uint64_t dequeue_ts_;
uint64_t before_queue_ts_;
uint64_t process_done_ts_;
};

Expand Down Expand Up @@ -67,8 +73,11 @@ class PikaClientConn : public net::RedisConn {
const net::HandleType& handle_type, int max_conn_rbuf_size);
~PikaClientConn() = default;

bool IsInterceptedByRTC(std::string& opt);

void ProcessRedisCmds(const std::vector<net::RedisCmdArgsType>& argvs, bool async, std::string* response) override;

bool ReadCmdInCache(const net::RedisCmdArgsType& argv, const std::string& opt);
void BatchExecRedisCmd(const std::vector<net::RedisCmdArgsType>& argvs);
int DealMessage(const net::RedisCmdArgsType& argv, std::string* response) override { return 0; }
static void DoBackgroundTask(void* arg);
Expand Down
10 changes: 9 additions & 1 deletion include/pika_command.h
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,12 @@ const std::string kCmdNameXInfo = "xinfo";

const std::string kClusterPrefix = "pkcluster";

/*
* If a type holds a key, a new data structure
* that uses the key will use this error
*/
constexpr const char* ErrTypeMessage = "Invalid argument: WRONGTYPE";

using PikaCmdArgsType = net::RedisCmdArgsType;
static const int RAW_ARGS_LEN = 1024 * 1024;

Expand Down Expand Up @@ -284,7 +290,7 @@ enum CmdFlags {
kCmdFlagsOperateKey = (1 << 19), // redis keySpace
kCmdFlagsStream = (1 << 20),
kCmdFlagsFast = (1 << 21),
kCmdFlagsSlow = (1 << 22),
kCmdFlagsSlow = (1 << 22)
};

void inline RedisAppendContent(std::string& str, const std::string& value);
Expand Down Expand Up @@ -526,6 +532,7 @@ class Cmd : public std::enable_shared_from_this<Cmd> {
bool hasFlag(uint32_t flag) const;
bool is_read() const;
bool is_write() const;
bool isCacheRead() const;

bool IsLocal() const;
bool IsSuspend() const;
Expand Down Expand Up @@ -568,6 +575,7 @@ class Cmd : public std::enable_shared_from_this<Cmd> {
void ProcessCommand(const HintKeys& hint_key = HintKeys());
void InternalProcessCommand(const HintKeys& hint_key);
void DoCommand(const HintKeys& hint_key);
bool DoReadCommandInCache();
void LogCommand() const;

std::string name_;
Expand Down
73 changes: 72 additions & 1 deletion src/pika_client_conn.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "include/pika_server.h"
#include "net/src/dispatch_thread.h"
#include "net/src/worker_thread.h"
#include "src/pstd/include/scope_record_lock.h"

extern std::unique_ptr<PikaConf> g_pika_conf;
extern PikaServer* g_pika_server;
Expand Down Expand Up @@ -232,6 +233,7 @@ void PikaClientConn::ProcessSlowlog(const PikaCmdArgsType& argv, uint64_t do_dur
LOG(ERROR) << "ip_port: " << ip_port() << ", db: " << current_db_ << ", command:" << slow_log
<< ", command_size: " << cmd_size - 1 << ", arguments: " << argv.size()
<< ", total_time(ms): " << time_stat_->total_time() / 1000
<< ", before_queue_time(ms): " << time_stat_->before_queue_time() / 1000
<< ", queue_time(ms): " << time_stat_->queue_time() / 1000
<< ", process_time(ms): " << time_stat_->process_time() / 1000
<< ", cmd_time(ms): " << do_duration / 1000;
Expand All @@ -250,13 +252,24 @@ void PikaClientConn::ProcessMonitor(const PikaCmdArgsType& argv) {
g_pika_server->AddMonitorMessage(monitor_message);
}

bool PikaClientConn::IsInterceptedByRTC(std::string& opt) {
//currently we only Intercept: Get, HGet
if (opt == kCmdNameGet && g_pika_conf->GetCacheString()) {
return true;
}
if (opt == kCmdNameHGet && g_pika_conf->GetCacheHash()) {
return true;
}
return false;
}

void PikaClientConn::ProcessRedisCmds(const std::vector<net::RedisCmdArgsType>& argvs, bool async,
std::string* response) {
time_stat_->Reset();
if (async) {
auto arg = new BgTaskArg();
arg->redis_cmds = argvs;
time_stat_->enqueue_ts_ = pstd::NowMicros();
time_stat_->enqueue_ts_ = time_stat_->before_queue_ts_ = pstd::NowMicros();
arg->conn_ptr = std::dynamic_pointer_cast<PikaClientConn>(shared_from_this());
/**
* If using the pipeline method to transmit batch commands to Pika, it is unable to
Expand All @@ -268,6 +281,19 @@ void PikaClientConn::ProcessRedisCmds(const std::vector<net::RedisCmdArgsType>&
pstd::StringToLower(opt);
bool is_slow_cmd = g_pika_conf->is_slow_cmd(opt);
bool is_admin_cmd = g_pika_conf->is_admin_cmd(opt);

//we don't intercept pipeline batch (argvs.size() > 1)
if (argvs.size() == 1 && IsInterceptedByRTC(opt) &&
PIKA_CACHE_NONE != g_pika_conf->cache_mode() &&
!IsInTxn()) {
// read in cache
if (ReadCmdInCache(argvs[0], opt)) {
delete arg;
return;
}
time_stat_->before_queue_ts_ = pstd::NowMicros();
}

g_pika_server->ScheduleClientPool(&DoBackgroundTask, arg, is_slow_cmd, is_admin_cmd);
return;
}
Expand Down Expand Up @@ -303,6 +329,51 @@ void PikaClientConn::BatchExecRedisCmd(const std::vector<net::RedisCmdArgsType>&
TryWriteResp();
}

bool PikaClientConn::ReadCmdInCache(const net::RedisCmdArgsType& argv, const std::string& opt) {
resp_num.store(1);
std::shared_ptr<Cmd> c_ptr = g_pika_cmd_table_manager->GetCmd(opt);
if (!c_ptr) {
return false;
}
// Check authed
if (AuthRequired()) { // the user is not authed, need to do auth
if (!(c_ptr->flag() & kCmdFlagsNoAuth)) {
return false;
}
}
// Initial
c_ptr->Initial(argv, current_db_);
//acl check
int8_t subCmdIndex = -1;
std::string errKey;
auto checkRes = user_->CheckUserPermission(c_ptr, argv, subCmdIndex, &errKey);
std::string object;
if (checkRes == AclDeniedCmd::CMD ||
checkRes == AclDeniedCmd::KEY ||
checkRes == AclDeniedCmd::CHANNEL ||
checkRes == AclDeniedCmd::NO_SUB_CMD ||
checkRes == AclDeniedCmd::NO_AUTH
) {
//acl check failed
return false;
}
//only read command(Get, HGet) will reach here, no need of record lock
if (c_ptr->db_->cache()->CacheStatus() != PIKA_CACHE_STATUS_OK) {
return false;
}
bool read_status = c_ptr->DoReadCommandInCache();
auto cmdstat_map = g_pika_cmd_table_manager->GetCommandStatMap();
resp_num--;
if (read_status) {
time_stat_->process_done_ts_ = pstd::NowMicros();
(*cmdstat_map)[argv[0]].cmd_count.fetch_add(1);
(*cmdstat_map)[argv[0]].cmd_time_consuming.fetch_add(time_stat_->total_time());
resp_array.emplace_back(std::make_shared<std::string>(std::move(c_ptr->res().message())));
TryWriteResp();
}
return read_status;
}

void PikaClientConn::TryWriteResp() {
int expected = 0;
if (resp_num.compare_exchange_strong(expected, -1)) {
Expand Down
55 changes: 39 additions & 16 deletions src/pika_command.cc
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ void InitCmdTable(CmdTable* cmd_table) {
cmd_table->insert(std::pair<std::string, std::unique_ptr<Cmd>>(kCmdNameSet, std::move(setptr)));
////GetCmd
std::unique_ptr<Cmd> getptr =
std::make_unique<GetCmd>(kCmdNameGet, 2, kCmdFlagsRead | kCmdFlagsKv | kCmdFlagsDoThroughDB | kCmdFlagsUpdateCache | kCmdFlagsReadCache | kCmdFlagsSlow);
std::make_unique<GetCmd>(kCmdNameGet, 2, kCmdFlagsRead | kCmdFlagsKv | kCmdFlagsDoThroughDB | kCmdFlagsUpdateCache | kCmdFlagsReadCache | kCmdFlagsSlow);
cmd_table->insert(std::pair<std::string, std::unique_ptr<Cmd>>(kCmdNameGet, std::move(getptr)));
////DelCmd
std::unique_ptr<Cmd> delptr =
Expand Down Expand Up @@ -397,15 +397,15 @@ void InitCmdTable(CmdTable* cmd_table) {
cmd_table->insert(std::pair<std::string, std::unique_ptr<Cmd>>(kCmdNameHSet, std::move(hsetptr)));
////HGetCmd
std::unique_ptr<Cmd> hgetptr =
std::make_unique<HGetCmd>(kCmdNameHGet, 3, kCmdFlagsRead | kCmdFlagsHash | kCmdFlagsUpdateCache | kCmdFlagsDoThroughDB | kCmdFlagsReadCache | kCmdFlagsFast);
std::make_unique<HGetCmd>(kCmdNameHGet, 3, kCmdFlagsRead | kCmdFlagsHash | kCmdFlagsUpdateCache | kCmdFlagsDoThroughDB | kCmdFlagsReadCache |kCmdFlagsFast);
cmd_table->insert(std::pair<std::string, std::unique_ptr<Cmd>>(kCmdNameHGet, std::move(hgetptr)));
////HGetallCmd
std::unique_ptr<Cmd> hgetallptr =
std::make_unique<HGetallCmd>(kCmdNameHGetall, 2, kCmdFlagsRead | kCmdFlagsHash | kCmdFlagsSlow | kCmdFlagsUpdateCache | kCmdFlagsDoThroughDB | kCmdFlagsReadCache);
cmd_table->insert(std::pair<std::string, std::unique_ptr<Cmd>>(kCmdNameHGetall, std::move(hgetallptr)));
////HExistsCmd
std::unique_ptr<Cmd> hexistsptr =
std::make_unique<HExistsCmd>(kCmdNameHExists, 3, kCmdFlagsRead | kCmdFlagsHash | kCmdFlagsUpdateCache | kCmdFlagsDoThroughDB | kCmdFlagsReadCache | kCmdFlagsFast);
std::make_unique<HExistsCmd>(kCmdNameHExists, 3, kCmdFlagsRead | kCmdFlagsHash | kCmdFlagsUpdateCache | kCmdFlagsDoThroughDB | kCmdFlagsReadCache | kCmdFlagsFast );
cmd_table->insert(std::pair<std::string, std::unique_ptr<Cmd>>(kCmdNameHExists, std::move(hexistsptr)));
////HIncrbyCmd
std::unique_ptr<Cmd> hincrbyptr =
Expand All @@ -425,7 +425,7 @@ void InitCmdTable(CmdTable* cmd_table) {
cmd_table->insert(std::pair<std::string, std::unique_ptr<Cmd>>(kCmdNameHLen, std::move(hlenptr)));
////HMgetCmd
std::unique_ptr<Cmd> hmgetptr =
std::make_unique<HMgetCmd>(kCmdNameHMget, -3, kCmdFlagsRead | kCmdFlagsHash | kCmdFlagsUpdateCache | kCmdFlagsDoThroughDB | kCmdFlagsReadCache | kCmdFlagsFast);
std::make_unique<HMgetCmd>(kCmdNameHMget, -3, kCmdFlagsRead | kCmdFlagsHash | kCmdFlagsUpdateCache | kCmdFlagsDoThroughDB | kCmdFlagsReadCache |kCmdFlagsFast);
cmd_table->insert(std::pair<std::string, std::unique_ptr<Cmd>>(kCmdNameHMget, std::move(hmgetptr)));
////HMsetCmd
std::unique_ptr<Cmd> hmsetptr =
Expand Down Expand Up @@ -741,50 +741,50 @@ void InitCmdTable(CmdTable* cmd_table) {
// PubSub
////Publish
std::unique_ptr<Cmd> publishptr =
std::make_unique<PublishCmd>(kCmdNamePublish, 3, kCmdFlagsRead | kCmdFlagsPubSub | kCmdFlagsFast);
std::make_unique<PublishCmd>(kCmdNamePublish, 3, kCmdFlagsRead | kCmdFlagsPubSub | kCmdFlagsFast );
cmd_table->insert(std::pair<std::string, std::unique_ptr<Cmd>>(kCmdNamePublish, std::move(publishptr)));
////Subscribe
std::unique_ptr<Cmd> subscribeptr =
std::make_unique<SubscribeCmd>(kCmdNameSubscribe, -2, kCmdFlagsRead | kCmdFlagsPubSub | kCmdFlagsSlow);
std::make_unique<SubscribeCmd>(kCmdNameSubscribe, -2, kCmdFlagsRead | kCmdFlagsPubSub | kCmdFlagsSlow );
cmd_table->insert(std::pair<std::string, std::unique_ptr<Cmd>>(kCmdNameSubscribe, std::move(subscribeptr)));
////UnSubscribe
std::unique_ptr<Cmd> unsubscribeptr =
std::make_unique<UnSubscribeCmd>(kCmdNameUnSubscribe, -1, kCmdFlagsRead | kCmdFlagsPubSub | kCmdFlagsSlow);
std::make_unique<UnSubscribeCmd>(kCmdNameUnSubscribe, -1, kCmdFlagsRead | kCmdFlagsPubSub | kCmdFlagsSlow );
cmd_table->insert(std::pair<std::string, std::unique_ptr<Cmd>>(kCmdNameUnSubscribe, std::move(unsubscribeptr)));
////PSubscribe
std::unique_ptr<Cmd> psubscribeptr =
std::make_unique<PSubscribeCmd>(kCmdNamePSubscribe, -2, kCmdFlagsRead | kCmdFlagsPubSub | kCmdFlagsSlow);
std::make_unique<PSubscribeCmd>(kCmdNamePSubscribe, -2, kCmdFlagsRead | kCmdFlagsPubSub | kCmdFlagsSlow );
cmd_table->insert(std::pair<std::string, std::unique_ptr<Cmd>>(kCmdNamePSubscribe, std::move(psubscribeptr)));
////PUnSubscribe
std::unique_ptr<Cmd> punsubscribeptr =
std::make_unique<PUnSubscribeCmd>(kCmdNamePUnSubscribe, -1, kCmdFlagsRead | kCmdFlagsPubSub | kCmdFlagsSlow);
std::make_unique<PUnSubscribeCmd>(kCmdNamePUnSubscribe, -1, kCmdFlagsRead | kCmdFlagsPubSub | kCmdFlagsSlow );
cmd_table->insert(std::pair<std::string, std::unique_ptr<Cmd>>(kCmdNamePUnSubscribe, std::move(punsubscribeptr)));
////PubSub
std::unique_ptr<Cmd> pubsubptr =
std::make_unique<PubSubCmd>(kCmdNamePubSub, -2, kCmdFlagsRead | kCmdFlagsPubSub | kCmdFlagsSlow);
std::make_unique<PubSubCmd>(kCmdNamePubSub, -2, kCmdFlagsRead | kCmdFlagsPubSub | kCmdFlagsSlow );
cmd_table->insert(std::pair<std::string, std::unique_ptr<Cmd>>(kCmdNamePubSub, std::move(pubsubptr)));

////ACL
std::unique_ptr<Cmd> aclptr = std::make_unique<PikaAclCmd>(KCmdNameAcl, -2, kCmdFlagsAdmin | kCmdFlagsSlow);
std::unique_ptr<Cmd> aclptr = std::make_unique<PikaAclCmd>(KCmdNameAcl, -2, kCmdFlagsAdmin | kCmdFlagsSlow );
cmd_table->insert(std::pair<std::string, std::unique_ptr<Cmd>>(KCmdNameAcl, std::move(aclptr)));

// Transaction
////Multi
std::unique_ptr<Cmd> multiptr =
std::make_unique<MultiCmd>(kCmdNameMulti, 1, kCmdFlagsRead | kCmdFlagsFast);
std::make_unique<MultiCmd>(kCmdNameMulti, 1, kCmdFlagsRead | kCmdFlagsFast );
cmd_table->insert(std::pair<std::string, std::unique_ptr<Cmd>>(kCmdNameMulti, std::move(multiptr)));
////Exec
std::unique_ptr<Cmd> execptr = std::make_unique<ExecCmd>(
kCmdNameExec, 1, kCmdFlagsRead | kCmdFlagsWrite | kCmdFlagsSuspend | kCmdFlagsSlow);
kCmdNameExec, 1, kCmdFlagsRead | kCmdFlagsWrite | kCmdFlagsSuspend | kCmdFlagsSlow );
cmd_table->insert(std::pair<std::string, std::unique_ptr<Cmd>>(kCmdNameExec, std::move(execptr)));
////Discard
std::unique_ptr<Cmd> discardptr = std::make_unique<DiscardCmd>(kCmdNameDiscard, 1, kCmdFlagsRead | kCmdFlagsFast);
std::unique_ptr<Cmd> discardptr = std::make_unique<DiscardCmd>(kCmdNameDiscard, 1, kCmdFlagsRead | kCmdFlagsFast );
cmd_table->insert(std::pair<std::string, std::unique_ptr<Cmd>>(kCmdNameDiscard, std::move(discardptr)));
////Watch
std::unique_ptr<Cmd> watchptr = std::make_unique<WatchCmd>(kCmdNameWatch, -2, kCmdFlagsRead | kCmdFlagsFast);
std::unique_ptr<Cmd> watchptr = std::make_unique<WatchCmd>(kCmdNameWatch, -2, kCmdFlagsRead | kCmdFlagsFast );
cmd_table->insert(std::pair<std::string, std::unique_ptr<Cmd>>(kCmdNameWatch, std::move(watchptr)));
////Unwatch
std::unique_ptr<Cmd> unwatchptr = std::make_unique<UnwatchCmd>(kCmdNameUnWatch, 1, kCmdFlagsRead | kCmdFlagsFast);
std::unique_ptr<Cmd> unwatchptr = std::make_unique<UnwatchCmd>(kCmdNameUnWatch, 1, kCmdFlagsRead | kCmdFlagsFast );
cmd_table->insert(std::pair<std::string, std::unique_ptr<Cmd>>(kCmdNameUnWatch, std::move(unwatchptr)));

// Stream
Expand Down Expand Up @@ -918,6 +918,29 @@ void Cmd::DoCommand(const HintKeys& hint_keys) {
}
}

bool Cmd::DoReadCommandInCache() {
if (!IsSuspend()) {
db_->DBLockShared();
}
DEFER {
if (!IsSuspend()) {
db_->DBUnlockShared();
}
};

if (db_->cache()->CacheStatus() == PIKA_CACHE_STATUS_OK) {
if (IsNeedReadCache()) {
ReadCache();
}
// return true only the read command hit
if (is_read() && !res().CacheMiss()) {
return true;
}
}
return false;
}


void Cmd::DoBinlog() {
if (res().ok() && is_write() && g_pika_conf->write_binlog()) {
std::shared_ptr<net::NetConn> conn_ptr = GetConn();
Expand Down

0 comments on commit c50c6c7

Please sign in to comment.