Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat:add transfer Leader cmd #5963

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions src/clients/meta/MetaClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2840,6 +2840,26 @@ folly::Future<StatusOr<std::vector<cpp2::RoleItem>>> MetaClient::getUserRoles(st
return future;
}

folly::Future<StatusOr<bool>> MetaClient::transferLeader(const HostAddr& host,
GraphSpaceID spaceId,
int32_t concurrency) {
cpp2::LeaderTransferReq req;
req.space_id_ref() = spaceId;
req.host_ref() = host;
req.concurrency_ref() = concurrency;
folly::Promise<StatusOr<bool>> promise;
auto future = promise.getFuture();
getResponse(
std::move(req),
[](auto client, auto request) { return client->future_leaderTransfer(request); },
[](cpp2::ExecResp&& resp) -> bool {
return resp.get_code() == nebula::cpp2::ErrorCode::SUCCEEDED;
},
std::move(promise));
return future;
}


folly::Future<StatusOr<bool>> MetaClient::regConfig(const std::vector<cpp2::ConfigItem>& items) {
memory::MemoryCheckOffGuard g;
cpp2::RegConfigReq req;
Expand Down
4 changes: 4 additions & 0 deletions src/clients/meta/MetaClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,10 @@ class MetaClient : public BaseMetaClient {

folly::Future<StatusOr<std::vector<cpp2::RoleItem>>> getUserRoles(std::string account);

folly::Future<StatusOr<bool>> transferLeader(const HostAddr& host,
GraphSpaceID spaceId,
int32_t concurrency);

// Operations for config
folly::Future<StatusOr<bool>> regConfig(const std::vector<cpp2::ConfigItem>& items);

Expand Down
14 changes: 7 additions & 7 deletions src/common/base/ObjectPool.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,13 @@ class ObjectPool final : private boost::noncopyable, private cpp::NonMovable {
return objects_.empty();
}

template <typename T>
T *add(T *obj) {
SLGuard g(lock_);
objects_.emplace_back(obj);
return obj;
}

private:
// Holder the ownership of the any object
class OwnershipHolder {
Expand All @@ -68,13 +75,6 @@ class ObjectPool final : private boost::noncopyable, private cpp::NonMovable {
std::function<void(void *)> deleteFn_;
};

template <typename T>
T *add(T *obj) {
SLGuard g(lock_);
objects_.emplace_back(obj);
return obj;
}

std::list<OwnershipHolder> objects_;
Arena arena_;

Expand Down
1 change: 1 addition & 0 deletions src/graph/executor/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ nebula_add_library(
admin/SessionExecutor.cpp
admin/ShowQueriesExecutor.cpp
admin/KillQueryExecutor.cpp
admin/TransferLeaderExecutor.cpp
maintain/TagExecutor.cpp
maintain/TagIndexExecutor.cpp
maintain/EdgeExecutor.cpp
Expand Down
4 changes: 4 additions & 0 deletions src/graph/executor/Executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
#include "graph/executor/admin/SwitchSpaceExecutor.h"
#include "graph/executor/admin/UpdateUserExecutor.h"
#include "graph/executor/admin/ZoneExecutor.h"
#include "graph/executor/admin/TransferLeaderExecutor.h"
#include "graph/executor/algo/AllPathsExecutor.h"
#include "graph/executor/algo/BFSShortestPathExecutor.h"
#include "graph/executor/algo/CartesianProductExecutor.h"
Expand Down Expand Up @@ -452,6 +453,9 @@ Executor *Executor::makeExecutor(QueryContext *qctx, const PlanNode *node) {
case PlanNode::Kind::kShowHosts: {
return pool->makeAndAdd<ShowHostsExecutor>(node, qctx);
}
case PlanNode::Kind::kTransferLeader: {
return pool->add(new TransferLeaderExecutor(node, qctx));
}
case PlanNode::Kind::kShowMetaLeader: {
return pool->makeAndAdd<ShowMetaLeaderExecutor>(node, qctx);
}
Expand Down
47 changes: 47 additions & 0 deletions src/graph/executor/admin/TransferLeaderExecutor.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
//
// Created by fujie on 24-10-21.
//

#include "TransferLeaderExecutor.h"

#include "clients/meta/MetaClient.h"
#include "common/time/ScopedTimer.h"
#include "graph/context/QueryContext.h"
#include "graph/planner/plan/Admin.h"
#include "graph/planner/plan/Query.h"
#include "graph/service/PermissionManager.h"
#include "interface/gen-cpp2/meta_types.h"

namespace nebula {
namespace graph {

folly::Future<Status> TransferLeaderExecutor::execute() {
SCOPED_TIMER(&execTime_);
return transferLeader();
}

folly::Future<Status> TransferLeaderExecutor::transferLeader() {
auto *transferLeaderNode = asNode<TransferLeader>(node());
auto spaceName = transferLeaderNode->spaceName();
auto spaceIdResult = qctx()->getMetaClient()->getSpaceIdByNameFromCache(spaceName);
NG_RETURN_IF_ERROR(spaceIdResult);
auto spaceId = spaceIdResult.value();
auto *session = qctx_->rctx()->session();
NG_RETURN_IF_ERROR(PermissionManager::canWriteSpace(session));

return qctx()
->getMetaClient()
->transferLeader(transferLeaderNode->address(), spaceId, transferLeaderNode->concurrency())
.via(runner())
.thenValue([this](StatusOr<bool> resp) {
SCOPED_TIMER(&execTime_);
NG_RETURN_IF_ERROR(resp);
if (!resp.value()) {
return Status::Error("Transfer leader failed!");
}
return Status::OK();
});
}

} // namespace graph
} // namespace nebula
27 changes: 27 additions & 0 deletions src/graph/executor/admin/TransferLeaderExecutor.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
//
// Created by fujie on 24-10-21.
//

#ifndef NEBULA_GRAPH_TRANSFERLEADEREXECUTOR_H
#define NEBULA_GRAPH_TRANSFERLEADEREXECUTOR_H

#include "graph/executor/Executor.h"

namespace nebula {
namespace graph {

class TransferLeaderExecutor final : public Executor {
public:
TransferLeaderExecutor(const PlanNode *node, QueryContext *qctx)
: Executor("TransferLeaderExecutor", node, qctx) {}

folly::Future<Status> execute() override;

private:
folly::Future<Status> transferLeader();
};

} // namespace graph
} // namespace nebula

#endif // NEBULA_GRAPH_TRANSFERLEADEREXECUTOR_H
36 changes: 36 additions & 0 deletions src/graph/planner/plan/Admin.h
Original file line number Diff line number Diff line change
Expand Up @@ -939,6 +939,42 @@ class SubmitJob final : public SingleDependencyNode {
const std::vector<std::string> params_;
};

class TransferLeader final : public SingleDependencyNode {
public:
static TransferLeader* make(QueryContext* qctx,
PlanNode* input,
HostAddr address,
std::string spaceName,
int32_t concurrency) {
return qctx->objPool()->add(
new TransferLeader(qctx, input, std::move(address), std::move(spaceName), concurrency));
}

const std::string& spaceName() const { return spaceName_; }

const HostAddr& address() const { return address_; }

int32_t concurrency() const { return concurrency_; }

private:
TransferLeader(QueryContext* qctx,
PlanNode* input,
HostAddr address,
std::string spaceName,
int32_t concurrency)
: SingleDependencyNode(qctx, Kind::kTransferLeader, input) {
spaceName_ = std::move(spaceName);
address_ = std::move(address);
concurrency_ = concurrency;
}

private:
std::string spaceName_;
HostAddr address_;
int32_t concurrency_;
};


class ShowCharset final : public SingleDependencyNode {
public:
static ShowCharset* make(QueryContext* qctx, PlanNode* input) {
Expand Down
2 changes: 2 additions & 0 deletions src/graph/planner/plan/PlanNode.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,8 @@ const char* PlanNode::toString(PlanNode::Kind kind) {
return "AddHosts";
case Kind::kDropHosts:
return "DropHosts";
case Kind::kTransferLeader:
return "TransferLeader";
// Zone
case Kind::kMergeZone:
return "MergeZone";
Expand Down
1 change: 1 addition & 0 deletions src/graph/planner/plan/PlanNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ class PlanNode {
kDivideZone,
kAddHosts,
kDropHosts,
kTransferLeader,
kDescribeZone,
kAddHostsIntoZone,

Expand Down
3 changes: 2 additions & 1 deletion src/graph/service/PermissionCheck.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ namespace graph {
* kAlterEdge, kDropTag, kDropEdge,
* kCreateTagIndex, kCreateEdgeIndex, kDropTagIndex,
* kDropEdgeIndex, Read user : Write user : kCreateUser, kDropUser, kAlterUser,
* Write role : kGrant, kRevoke,
* Write role : kGrant, kRevoke, kTransferLeader
* Read data : kGo , kSet, kPipe, kMatch, kAssignment, kLookup,
* kYield, kOrderBy, kFetchVertices, kFind
* kFetchEdges, kFindPath, kLimit, KGroupBy, kReturn
Expand Down Expand Up @@ -66,6 +66,7 @@ namespace graph {
case Sentence::Kind::kDescribeZone:
case Sentence::Kind::kListZones:
case Sentence::Kind::kAddHostsIntoZone:
case Sentence::Kind::kTransferLeader:
case Sentence::Kind::kShowConfigs:
case Sentence::Kind::kSetConfig:
case Sentence::Kind::kGetConfig:
Expand Down
15 changes: 15 additions & 0 deletions src/graph/validator/AdminValidator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -712,5 +712,20 @@ Status KillQueryValidator::toPlan() {
tail_ = root_;
return Status::OK();
}

Status TransferLeaderValidator::validateImpl() { return Status::OK(); }

Status TransferLeaderValidator::toPlan() {
auto sentence = static_cast<TransferLeaderSentence *>(sentence_);
auto *node = TransferLeader::make(qctx_,
nullptr,
*sentence->address(),
*sentence->spaceName(),
sentence->concurrency());
root_ = node;
tail_ = root_;
return Status::OK();
}

} // namespace graph
} // namespace nebula
14 changes: 14 additions & 0 deletions src/graph/validator/AdminValidator.h
Original file line number Diff line number Diff line change
Expand Up @@ -429,6 +429,20 @@ class KillQueryValidator final : public Validator {

Status toPlan() override;
};

class TransferLeaderValidator final : public Validator {
public:
TransferLeaderValidator(Sentence* sentence, QueryContext* context)
: Validator(sentence, context) {
setNoSpaceRequired();
}

private:
Status validateImpl() override;

Status toPlan() override;
};

} // namespace graph
} // namespace nebula
#endif // GRAPH_VALIDATOR_ADMINVALIDATOR_H_
2 changes: 2 additions & 0 deletions src/graph/validator/Validator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,8 @@ std::unique_ptr<Validator> Validator::makeValidator(Sentence* sentence, QueryCon
return std::make_unique<ShowRolesInSpaceValidator>(sentence, context);
case Sentence::Kind::kDescribeUser:
return std::make_unique<DescribeUserValidator>(sentence, context);
case Sentence::Kind::kTransferLeader:
return std::make_unique<TransferLeaderValidator>(sentence, context);
case Sentence::Kind::kAdminJob:
case Sentence::Kind::kAdminShowJobs:
return std::make_unique<AdminJobValidator>(sentence, context);
Expand Down
9 changes: 9 additions & 0 deletions src/interface/meta.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -760,6 +760,13 @@ struct BalanceTask {
5: i64 stop_time,
}

struct LeaderTransferReq {
1: common.HostAddr host,
2: common.GraphSpaceID space_id,
3: i32 concurrency,
}


enum ConfigModule {
UNKNOWN = 0x00,
ALL = 0x01,
Expand Down Expand Up @@ -1262,6 +1269,8 @@ service MetaService {
HBResp heartBeat(1: HBReq req);
AgentHBResp agentHeartbeat(1: AgentHBReq req);

ExecResp leaderTransfer(1: LeaderTransferReq req);

ExecResp regConfig(1: RegConfigReq req);
GetConfigResp getConfig(1: GetConfigReq req);
ExecResp setConfig(1: SetConfigReq req);
Expand Down
2 changes: 2 additions & 0 deletions src/meta/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ nebula_add_library(
processors/admin/GetMetaDirInfoProcessor.cpp
processors/admin/VerifyClientVersionProcessor.cpp
processors/admin/SaveGraphVersionProcessor.cpp
processors/job/LeaderTransferProcessor.cpp
processors/job/LeaderTransfer.cpp
processors/config/RegConfigProcessor.cpp
processors/config/GetConfigProcessor.cpp
processors/config/ListConfigsProcessor.cpp
Expand Down
9 changes: 9 additions & 0 deletions src/meta/MetaServiceHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
#include "meta/processors/zone/ListZonesProcessor.h"
#include "meta/processors/zone/MergeZoneProcessor.h"
#include "meta/processors/zone/RenameZoneProcessor.h"
#include "meta/processors/job/LeaderTransferProcessor.h"

#define RETURN_FUTURE(processor) \
auto f = processor->getFuture(); \
Expand Down Expand Up @@ -561,5 +562,13 @@ folly::Future<cpp2::GetSegmentIdResp> MetaServiceHandler::future_getSegmentId(
auto* processor = GetSegmentIdProcessor::instance(kvstore_);
RETURN_FUTURE(processor);
}


folly::Future<cpp2::ExecResp> MetaServiceHandler::future_leaderTransfer(
const cpp2::LeaderTransferReq& req) {
auto* processor = LeaderTransferProcessor::instance(kvstore_);
RETURN_FUTURE(processor);
}

} // namespace meta
} // namespace nebula
2 changes: 2 additions & 0 deletions src/meta/MetaServiceHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,8 @@ class MetaServiceHandler final : public cpp2::MetaServiceSvIf {
folly::Future<cpp2::GetSegmentIdResp> future_getSegmentId(
const cpp2::GetSegmentIdReq& req) override;

folly::Future<cpp2::ExecResp> future_leaderTransfer(const cpp2::LeaderTransferReq& req) override;

private:
kvstore::KVStore* kvstore_ = nullptr;
ClusterID clusterId_{0};
Expand Down
Loading