diff --git a/include/common/common.h b/include/common/common.h index eded25c9..9be724b1 100644 --- a/include/common/common.h +++ b/include/common/common.h @@ -1337,6 +1337,7 @@ void stripslashes(std::string& str, bool is_gbk); extern void update_schema_conf_common(const std::string& table_name, const pb::SchemaConf& schema_conf, pb::SchemaConf* p_conf); extern void update_op_version(pb::SchemaConf* p_conf, const std::string& desc); extern int primitive_to_proto_type(pb::PrimitiveType type); +extern int primitive_type_bytes_len(pb::PrimitiveType type); extern int get_physical_room(const std::string& ip_and_port_str, std::string& host); extern int get_instance_from_bns(int* ret, const std::string& bns_name, diff --git a/include/common/histogram.h b/include/common/histogram.h index aa3c9400..1d8fb5ee 100644 --- a/include/common/histogram.h +++ b/include/common/histogram.h @@ -302,7 +302,9 @@ class PacketSample { sample_sorter.insert_row(batch->get_row().get()); } batch->reset(); - _batch_vector.push_back(batch); + if (batch->size() > 0) { + _batch_vector.push_back(batch); + } } while (!eos); sample_sorter.insert_done(); diff --git a/include/exec/dml_node.h b/include/exec/dml_node.h index 839b9476..048387f4 100644 --- a/include/exec/dml_node.h +++ b/include/exec/dml_node.h @@ -28,7 +28,7 @@ class DMLNode : public ExecNode { virtual void find_place_holder(std::unordered_multimap& placeholders); int insert_row(RuntimeState* state, SmartRecord record, bool is_update = false); int delete_row(RuntimeState* state, SmartRecord record, MemRow* row); - int get_lock_row(RuntimeState* state, SmartRecord record, std::string* pk_str, MemRow* row); + int get_lock_row(RuntimeState* state, SmartRecord record, std::string* pk_str, MemRow* row, int64_t& ttl_ts); int remove_row(RuntimeState* state, SmartRecord record, const std::string& pk_str, bool delete_primary = true); int update_row(RuntimeState* state, SmartRecord record, MemRow* row); diff --git a/include/exec/exec_node.h b/include/exec/exec_node.h index 364af88d..606b487a 100644 --- a/include/exec/exec_node.h +++ b/include/exec/exec_node.h @@ -233,6 +233,9 @@ class ExecNode { } void set_limit(int64_t limit) { _limit = limit; + if (_limit < 0) { + _limit = -1; + } } virtual void reset_limit(int64_t limit) { _limit = limit; diff --git a/include/exec/filter_node.h b/include/exec/filter_node.h index 2e604c05..2c0622dd 100644 --- a/include/exec/filter_node.h +++ b/include/exec/filter_node.h @@ -107,6 +107,15 @@ class FilterNode : public ExecNode { } } + bool is_empty_filter() { + //只有scan_node做索引条件的时候会用_pruned_conjuncts + if (_children.size() > 0 && _children[0]->node_type() == pb::SCAN_NODE) { + return _pruned_conjuncts.empty(); + } else { + return _conjuncts.empty(); + } + } + private: bool need_copy(MemRow* row); private: diff --git a/include/logical_plan/query_context.h b/include/logical_plan/query_context.h index 8f0b35ab..993023ec 100644 --- a/include/logical_plan/query_context.h +++ b/include/logical_plan/query_context.h @@ -192,6 +192,7 @@ class QueryContext { return plan.add_nodes(); } int create_plan_tree(); + int destroy_plan_tree(); void add_sub_ctx(std::shared_ptr& ctx) { std::unique_lock lck(_kill_lock); diff --git a/include/meta_server/table_manager.h b/include/meta_server/table_manager.h index 377aa76f..0954f94d 100644 --- a/include/meta_server/table_manager.h +++ b/include/meta_server/table_manager.h @@ -984,11 +984,11 @@ class TableManager { auto src_type = src_field.mysql_type(); auto target_type = target_field.mysql_type(); if (src_type == target_type) { -// if (src_type == pb::DATETIME || src_type == pb::FLOAT || src_type == pb::DOUBLE) { -// if (src_field.float_precision_len() > target_field.float_precision_len()) { -// return false; -// } -// } + if (src_type == pb::DATETIME) { + if (src_field.float_precision_len() > target_field.float_precision_len()) { + return false; + } + } return true; } switch (src_type) { @@ -1007,11 +1007,14 @@ class TableManager { } int s = primitive_to_proto_type(src_type); int t = primitive_to_proto_type(target_type); - if (s == t) return true; - if (s == FieldDescriptorProto::TYPE_SINT32 && t == FieldDescriptorProto::TYPE_SINT64) return true; - if (s == FieldDescriptorProto::TYPE_SINT64 && t == FieldDescriptorProto::TYPE_SINT32) return true; - if (s == FieldDescriptorProto::TYPE_UINT32 && t == FieldDescriptorProto::TYPE_UINT64) return true; - if (s == FieldDescriptorProto::TYPE_UINT64 && t == FieldDescriptorProto::TYPE_UINT32) return true; + if (s != t) return false; + if (primitive_type_bytes_len(src_type) <= primitive_type_bytes_len(target_type)) { + return true; + } + // if (s == FieldDescriptorProto::TYPE_SINT32 && t == FieldDescriptorProto::TYPE_SINT64) return true; + // if (s == FieldDescriptorProto::TYPE_SINT64 && t == FieldDescriptorProto::TYPE_SINT32) return true; + // if (s == FieldDescriptorProto::TYPE_UINT32 && t == FieldDescriptorProto::TYPE_UINT64) return true; + // if (s == FieldDescriptorProto::TYPE_UINT64 && t == FieldDescriptorProto::TYPE_UINT32) return true; return false; } int get_index_state(int64_t table_id, int64_t index_id, pb::IndexState& index_state) { diff --git a/include/physical_plan/join_reorder.h b/include/physical_plan/join_reorder.h index 4e835526..db6cb561 100644 --- a/include/physical_plan/join_reorder.h +++ b/include/physical_plan/join_reorder.h @@ -20,6 +20,7 @@ namespace baikaldb { class JoinReorder { public: int analyze(QueryContext* ctx); + int reorder(QueryContext* ctx, ExecNode* node); }; } diff --git a/include/sqlparser/utils.h b/include/sqlparser/utils.h index 7920b961..2e2fb938 100644 --- a/include/sqlparser/utils.h +++ b/include/sqlparser/utils.h @@ -88,6 +88,7 @@ struct String { size_t fast = 0; bool has_slash = false; static std::unordered_map trans_map = { + {'0', '\x00'}, {'\\', '\\'}, {'\"', '\"'}, {'\'', '\''}, @@ -96,7 +97,6 @@ struct String { {'n', '\n'}, {'b', '\b'}, {'Z', '\x1A'}, - {'0', '\0'}, }; while (fast < length) { if (has_slash) { diff --git a/proto/plan.proto b/proto/plan.proto index 4d07b98f..1e23b18f 100755 --- a/proto/plan.proto +++ b/proto/plan.proto @@ -280,6 +280,7 @@ message UpdateNode { repeated SlotDescriptor update_slots = 3; repeated Expr update_exprs = 4; //repeated int64 affect_index_ids = 5; + optional int64 row_ttl_duration = 6; //row ttl support, compatible whit prepared stmt }; message PacketNode { diff --git a/src/common/common.cpp b/src/common/common.cpp index 694fc6ff..7a467e98 100644 --- a/src/common/common.cpp +++ b/src/common/common.cpp @@ -321,6 +321,7 @@ void stripslashes(std::string& str, bool is_gbk) { size_t fast = 0; bool has_slash = false; static std::unordered_map trans_map = { + {'0', '\x00'}, {'\\', '\\'}, {'\"', '\"'}, {'\'', '\''}, @@ -439,6 +440,23 @@ void update_schema_conf_common(const std::string& table_name, const pb::SchemaCo DB_WARNING("%s schema conf UPDATE TO : %s", table_name.c_str(), schema_conf.ShortDebugString().c_str()); } +int primitive_type_bytes_len(pb::PrimitiveType type) { + static std::unordered_map _mysql_pb_type_bytes_count = { + { pb::INT8, 1}, + { pb::INT16, 2}, + { pb::INT32, 3}, + { pb::INT64, 4}, + { pb::UINT8, 1}, + { pb::UINT16, 2}, + { pb::UINT32, 3}, + { pb::UINT64, 4} + }; + if (_mysql_pb_type_bytes_count.count(type) == 0) { + return 0xFFFF; + } + return _mysql_pb_type_bytes_count[type]; +} + int primitive_to_proto_type(pb::PrimitiveType type) { using google::protobuf::FieldDescriptorProto; static std::unordered_map _mysql_pb_type_mapping = { diff --git a/src/common/information_schema.cpp b/src/common/information_schema.cpp index a5d39fab..d9f57b07 100644 --- a/src/common/information_schema.cpp +++ b/src/common/information_schema.cpp @@ -1216,7 +1216,7 @@ void InformationSchema::init_tables() { record->set_value(record->get_field_by_name("CREATE_TIME"), ct.cast_to(pb::DATETIME)); record->set_string(record->get_field_by_name("TABLE_COLLATION"), coll); record->set_string(record->get_field_by_name("CREATE_OPTIONS"), ""); - record->set_string(record->get_field_by_name("TABLE_COMMENT"), ""); + record->set_string(record->get_field_by_name("TABLE_COMMENT"), table_info->comment); record->set_int64(record->get_field_by_name("TABLE_ID"), table_info->id); records.emplace_back(record); } diff --git a/src/exec/access_path.cpp b/src/exec/access_path.cpp index 4757fa0e..deac5094 100755 --- a/src/exec/access_path.cpp +++ b/src/exec/access_path.cpp @@ -92,7 +92,8 @@ bool AccessPath::check_sort_use_index(Property& sort_property) { std::vector& order_exprs = sort_property.slot_order_exprs; SlotRef* slot_ref = static_cast(order_exprs[0]); size_t idx = 0; - auto& fields = index_info_ptr->fields; + std::vectorfields(index_info_ptr->fields.begin(), index_info_ptr->fields.end()); + fields.insert(fields.end(),index_info_ptr->pk_fields.begin(), index_info_ptr->pk_fields.end()); for (; idx < fields.size(); ++idx) { if (tuple_id == slot_ref->tuple_id() && fields[idx].id == slot_ref->field_id()) { break; diff --git a/src/exec/dml_node.cpp b/src/exec/dml_node.cpp index f93c0d78..a3ad90ad 100644 --- a/src/exec/dml_node.cpp +++ b/src/exec/dml_node.cpp @@ -506,7 +506,7 @@ int DMLNode::insert_row(RuntimeState* state, SmartRecord record, bool is_update) return ++affected_rows; } -int DMLNode::get_lock_row(RuntimeState* state, SmartRecord record, std::string* pk_str, MemRow* row) { +int DMLNode::get_lock_row(RuntimeState* state, SmartRecord record, std::string* pk_str, MemRow* row, int64_t& ttl_ts) { int ret = 0; MutTableKey pk_key; ret = record->encode_key(*_pri_info, pk_key, -1, false); @@ -522,12 +522,12 @@ int DMLNode::get_lock_row(RuntimeState* state, SmartRecord record, std::string* record->decode_key(*_pri_info, *pk_str); } //delete requires all fields (index and non-index fields) - ret = _txn->get_update_primary(_region_id, *_pri_info, record, _field_ids, GET_LOCK, true); + ret = _txn->get_update_primary(_region_id, *_pri_info, record, _field_ids, GET_LOCK, true, ttl_ts); if (ret < 0) { return ret; } if (row != nullptr && _tuple_desc != nullptr - && (_node_type == pb::DELETE_NODE || _node_type == pb::UPDATE_NODE)) { + && (_node_type == pb::DELETE_NODE || _node_type == pb::UPDATE_NODE || _node_type == pb::LOCK_PRIMARY_NODE)) { for (auto slot : _tuple_desc->slots()) { auto field = record->get_field_by_tag(slot.field_id()); row->set_value(slot.tuple_id(), slot.slot_id(), @@ -637,7 +637,8 @@ int DMLNode::remove_row(RuntimeState* state, SmartRecord record, int DMLNode::delete_row(RuntimeState* state, SmartRecord record, MemRow* row) { int ret = 0; std::string pk_str; - ret = get_lock_row(state, record, &pk_str, row); + int64_t ttl_ts = 0; + ret = get_lock_row(state, record, &pk_str, row, ttl_ts); if (ret == -3) { //DB_WARNING_STATE(state, "key not in this region:%ld", _region_id); return 0; @@ -673,7 +674,8 @@ bool DMLNode::satisfy_condition_again(RuntimeState* state, MemRow* row) { int DMLNode::update_row(RuntimeState* state, SmartRecord record, MemRow* row) { int ret = 0; std::string pk_str; - ret = get_lock_row(state, record, &pk_str, row); + int64_t ttl_ts = 0; + ret = get_lock_row(state, record, &pk_str, row, ttl_ts); if (ret == -3) { //DB_WARNING_STATE(state, "key not in this region:%ld", _region_id); return 0; @@ -689,6 +691,13 @@ int DMLNode::update_row(RuntimeState* state, SmartRecord record, MemRow* row) { // UndoGetForUpdate(pk_str)? 同一个txn GetForUpdate与UndoGetForUpdate之间不要写pk_str return 0; } + // _row_ttl_duration == -1 代表保持原ttl意思 + // TODO: 全局索引 keep ttl功能 + if (_row_ttl_duration == -1 && _ttl_timestamp_us > 0 && ttl_ts > 0) { + _ttl_timestamp_us = ttl_ts; + _txn->set_write_ttl_timestamp_us(_ttl_timestamp_us); + DB_DEBUG("keep ttl_timestamp_us: %ld", _ttl_timestamp_us); + } _indexes_ptr = &_affected_indexes; // 影响了主键需要删除旧的行 ret = remove_row(state, record, pk_str, _update_affect_primary); diff --git a/src/exec/fetcher_store.cpp b/src/exec/fetcher_store.cpp index 0fcb07a4..65e26e16 100755 --- a/src/exec/fetcher_store.cpp +++ b/src/exec/fetcher_store.cpp @@ -386,6 +386,7 @@ void OnRPCDone::select_addr() { if (peer_index < sorted_peers.size()) { _addr = sorted_peers[peer_index]; DB_WARNING("choose peer %s, index: %ld", _addr.c_str(), peer_index); + FetcherStore::choose_other_if_dead(_info, _addr); } } _request.set_select_without_leader(true); @@ -1015,15 +1016,21 @@ void FetcherStore::choose_other_if_dead(pb::RegionInfo& info, std::string& addr) } std::vector normal_peers; + std::vector other_peers; for (auto& peer: info.peers()) { auto status = schema_factory->get_instance_status(peer); if (status.status == pb::NORMAL) { normal_peers.push_back(peer); + } else if (status.status != pb::DEAD) { + other_peers.push_back(peer); } } if (normal_peers.size() > 0) { uint32_t i = butil::fast_rand() % normal_peers.size(); addr = normal_peers[i]; + } else if(other_peers.size() > 0) { + uint32_t i = butil::fast_rand() % other_peers.size(); + addr = other_peers[i]; } else { DB_DEBUG("all peer faulty, %ld", info.region_id()); } diff --git a/src/exec/lock_primary_node.cpp b/src/exec/lock_primary_node.cpp index 6c96aa9e..0dbf2f96 100755 --- a/src/exec/lock_primary_node.cpp +++ b/src/exec/lock_primary_node.cpp @@ -223,7 +223,8 @@ int LockPrimaryNode::open(RuntimeState* state) { case pb::LOCK_GET_DML: { for (auto& record : delete_records) { //DB_WARNING_STATE(state,"record:%s", record->debug_string().c_str()); - ret = delete_row(state, record, nullptr); + std::unique_ptr row = state->mem_row_desc()->fetch_mem_row(); + ret = delete_row(state, record, row.get()); if (ret < 0) { DB_WARNING_STATE(state, "delete_row fail"); return -1; diff --git a/src/exec/packet_node.cpp b/src/exec/packet_node.cpp index b9d83200..10195f7f 100644 --- a/src/exec/packet_node.cpp +++ b/src/exec/packet_node.cpp @@ -446,6 +446,10 @@ int PacketNode::open(RuntimeState* state) { } _send_buf = state->send_buf(); int ret = 0; + if (state->explain_type == EXPLAIN_SHOW_COST) { + handle_show_cost(state); + return 0; + } if (!_return_empty || op_type() == pb::OP_SELECT) { ret = ExecNode::open(state); if (ret < 0) { @@ -484,10 +488,6 @@ int PacketNode::open(RuntimeState* state) { } return 0; } - if (state->explain_type == EXPLAIN_SHOW_COST) { - handle_show_cost(state); - return 0; - } state->set_num_affected_rows(ret); if (op_type() != pb::OP_SELECT && op_type() != pb::OP_UNION) { pack_ok(state->num_affected_rows(), _client); @@ -682,7 +682,9 @@ int PacketNode::open_analyze(RuntimeState* state) { return ret; } state->inc_num_returned_rows(batch->size()); - batch_vector.push_back(batch); + if (batch->size() > 0) { + batch_vector.push_back(batch); + } } while (!eos); std::vector slot_order_exprs; diff --git a/src/exec/update_node.cpp b/src/exec/update_node.cpp index fa0dcc79..f8409c57 100644 --- a/src/exec/update_node.cpp +++ b/src/exec/update_node.cpp @@ -29,6 +29,8 @@ int UpdateNode::init(const pb::PlanNode& node) { } _table_id = node.derive_node().update_node().table_id(); _global_index_id = _table_id; + _row_ttl_duration = node.derive_node().update_node().row_ttl_duration(); + DB_DEBUG("_row_ttl_duration:%ld", _row_ttl_duration); _primary_slots.clear(); _primary_slots.reserve(node.derive_node().update_node().primary_slots_size()); for (auto& slot : node.derive_node().update_node().primary_slots()) { diff --git a/src/logical_plan/insert_planner.cpp b/src/logical_plan/insert_planner.cpp index 8bd0bccf..a69fcb55 100644 --- a/src/logical_plan/insert_planner.cpp +++ b/src/logical_plan/insert_planner.cpp @@ -36,7 +36,7 @@ int InsertPlanner::plan() { insert->set_need_ignore(_insert_stmt->is_ignore); insert->set_is_replace(_insert_stmt->is_replace); insert->set_is_merge(_insert_stmt->is_merge); - if (_ctx->row_ttl_duration > 0) { + if (_ctx->row_ttl_duration > 0 || _ctx->row_ttl_duration == -1) { insert->set_row_ttl_duration(_ctx->row_ttl_duration); DB_DEBUG("row_ttl_duration: %ld", _ctx->row_ttl_duration); } diff --git a/src/logical_plan/logical_planner.cpp b/src/logical_plan/logical_planner.cpp index 74464ec8..bf4785c6 100644 --- a/src/logical_plan/logical_planner.cpp +++ b/src/logical_plan/logical_planner.cpp @@ -543,49 +543,25 @@ int LogicalPlanner::generate_sql_sign(QueryContext* ctx, parser::StmtNode* stmt) pb::ScanNode* scan = derive->mutable_scan_node(); int64_t table_id = scan->table_id(); if (table_index_map.count(table_id) > 0) { - for (auto& index_name: table_index_map[table_id]) { - int64_t index_id = 0; - auto ret = _factory->get_index_id(table_id, index_name, index_id); - if (ret != 0) { - DB_WARNING("index_name: %s in table:%s not exist", index_name.c_str(), - _factory->get_table_info_ptr(table_id)->name.c_str()); - continue; - } - scan->add_force_indexes(index_id); - } - } - } - } - } - } - - if (!ctx->sign_forceindex.empty()) { - if (ctx->sign_forceindex.count(stat_info->sign) > 0) { - auto& table_index_map = ctx->sign_forceindex[stat_info->sign]; - for (int i = 0; i < ctx->plan.nodes_size(); i++) { - auto node = ctx->plan.mutable_nodes(i); - if (node->node_type() != pb::SCAN_NODE) { - continue; - } - pb::DerivePlanNode* derive = node->mutable_derive_node(); - pb::ScanNode* scan = derive->mutable_scan_node(); - int64_t table_id = scan->table_id(); - if (table_index_map.count(table_id) > 0) { - for (auto& index_name: table_index_map[table_id]) { - int64_t index_id = 0; - auto ret = _factory->get_index_id(table_id, index_name, index_id); - if (ret != 0) { - DB_WARNING("index_name: %s in table:%s not exist", index_name.c_str(), - _factory->get_table_info_ptr(table_id)->name.c_str()); - continue; + for (auto& index_name: table_index_map[table_id]) { + int64_t index_id = 0; + auto ret = _factory->get_index_id(table_id, index_name, index_id); + if (ret != 0) { + DB_WARNING("index_name: %s in table:%s not exist", index_name.c_str(), + _factory->get_table_info_ptr(table_id)->name.c_str()); + continue; + } + auto indexInfoPtr = _factory->get_index_info_ptr(index_id); + if (indexInfoPtr == nullptr || indexInfoPtr->state != pb::IS_PUBLIC || indexInfoPtr->index_hint_status != pb::IHS_NORMAL) { + continue; + } + scan->add_force_indexes(index_id); } - scan->add_force_indexes(index_id); } } } } } - return 0; } @@ -634,19 +610,19 @@ int LogicalPlanner::gen_subquery_plan(parser::DmlNode* subquery, const SmartPlan } } _cur_sub_ctx->expr_params.row_filed_number = planner->select_names().size(); - int ret = _cur_sub_ctx->create_plan_tree(); - if (ret < 0) { - DB_WARNING("Failed to pb_plan to execnode"); - return -1; - } _ctx->set_kill_ctx(_cur_sub_ctx); auto stat_info = &(_cur_sub_ctx->stat_info); - ret = generate_sql_sign(_cur_sub_ctx.get(), subquery); + int ret = generate_sql_sign(_cur_sub_ctx.get(), subquery); if (ret < 0) { return -1; } auto& client_conn = _ctx->client_conn; client_conn->insert_subquery_sign(stat_info->sign); + ret = _cur_sub_ctx->create_plan_tree(); + if (ret < 0) { + DB_WARNING("Failed to pb_plan to execnode"); + return -1; + } return 0; } @@ -782,20 +758,6 @@ int LogicalPlanner::add_table(const std::string& database, const std::string& ta } } - _ctx->sign_blacklist.insert(tbl_ptr->sign_blacklist.begin(), tbl_ptr->sign_blacklist.end()); - _ctx->sign_forcelearner.insert(tbl_ptr->sign_forcelearner.begin(), tbl_ptr->sign_forcelearner.end()); - for (auto& sign_index : tbl_ptr->sign_forceindex) { - std::vector vec; - boost::split(vec, sign_index, boost::is_any_of(":")); - if (vec.size() != 2) { - continue; - } - uint64_t sign_num = strtoull(vec[0].c_str(), nullptr, 10); - auto& table_index_map = _ctx->sign_forceindex[sign_num]; - auto& force_index_set = table_index_map[tableid]; - force_index_set.insert(vec[1]); - } - // 通用降级路由 // 复杂sql(join和子查询)不降级 if (MetaServerInteract::get_backup_instance()->is_inited() && tbl_ptr->have_backup && !_ctx->is_complex && @@ -882,6 +844,22 @@ int LogicalPlanner::add_table(const std::string& database, const std::string& ta _partition_names.clear(); } _ctx->stat_info.table_id = tableid; + auto tbl_ptr = _factory->get_table_info_ptr(tableid); + _ctx->sign_blacklist.insert(tbl_ptr->sign_blacklist.begin(), tbl_ptr->sign_blacklist.end()); + _ctx->sign_forcelearner.insert(tbl_ptr->sign_forcelearner.begin(), tbl_ptr->sign_forcelearner.end()); + for (auto& sign_index : tbl_ptr->sign_forceindex) { + std::vector vec; + boost::split(vec, sign_index, boost::is_any_of(":")); + if (vec.size() != 2) { + continue; + } + uint64_t sign_num = strtoull(vec[0].c_str(), nullptr, 10); + auto& table_index_map = _ctx->sign_forceindex[sign_num]; + auto& force_index_set = table_index_map[tableid]; + force_index_set.insert(vec[1]); + } + + ScanTupleInfo* tuple_info = get_scan_tuple(alias_full_name, tableid); _ctx->current_tuple_ids.emplace(tuple_info->tuple_id); _ctx->current_table_tuple_ids.emplace(tuple_info->tuple_id); @@ -1233,6 +1211,10 @@ int LogicalPlanner::create_join_node_from_terminator(const std::string db, index_name.c_str(), alias_full_name.c_str()); return -1; } + auto indexInfoPtr = _factory->get_index_info_ptr(index_id); + if (indexInfoPtr == nullptr || indexInfoPtr->state != pb::IS_PUBLIC || indexInfoPtr->index_hint_status != pb::IHS_NORMAL) { + continue; + } (*join_root_ptr)->force_indexes.insert(index_id); } for (auto& index_name : ignore_index_names) { diff --git a/src/logical_plan/prepare_planner.cpp b/src/logical_plan/prepare_planner.cpp index d2d14362..5f21794c 100644 --- a/src/logical_plan/prepare_planner.cpp +++ b/src/logical_plan/prepare_planner.cpp @@ -18,6 +18,7 @@ #include "insert_planner.h" #include "delete_planner.h" #include "update_planner.h" +#include "union_planner.h" #include "transaction_planner.h" #include "exec_node.h" #include "packet_node.h" @@ -188,6 +189,9 @@ int PreparePlanner::stmt_prepare(const std::string& stmt_name, const std::string case parser::NT_DELETE: planner.reset(new DeletePlanner(prepare_ctx.get())); break; + case parser::NT_UNION: + planner.reset(new UnionPlanner(prepare_ctx.get())); + break; default: DB_WARNING("un-supported prepare command type: %d", prepare_ctx->stmt_type); return -1; @@ -210,6 +214,9 @@ int PreparePlanner::stmt_prepare(const std::string& stmt_name, const std::string return -1; } prepare_ctx->root->find_place_holder(prepare_ctx->placeholders); + for (auto sub_query_ctx : prepare_ctx->sub_query_plans) { + sub_query_ctx->root->find_place_holder(prepare_ctx->placeholders); + } /* // 包括类型推导与常量表达式计算 ret = ExprOptimize().analyze(prepare_ctx.get()); @@ -257,8 +264,9 @@ int PreparePlanner::stmt_execute(const std::string& stmt_name, std::vectorcopy_query_context(prepare_ctx.get()); auto* p_placeholders = &prepare_ctx->placeholders; - if (!prepare_ctx->is_select) { - // TODO dml的plan复用 + + // TODO dml的plan复用 + if (!prepare_ctx->is_select || prepare_ctx->sub_query_plans.size() > 0) { // enable_2pc=true or table has global index need generate txn_id set_dml_txn_state(prepare_ctx->prepared_table_id); _ctx->plan.CopyFrom(prepare_ctx->plan); @@ -273,6 +281,17 @@ int PreparePlanner::stmt_execute(const std::string& stmt_name, std::vectorroot->find_place_holder(_ctx->placeholders); + for (auto sub_query_ctx : prepare_ctx->sub_query_plans) { + // stmt_prepare的plan()函数里已经生成过一次了,此处需要先释放,再生成 + sub_query_ctx->destroy_plan_tree(); + int ret = sub_query_ctx->create_plan_tree(); + if (ret < 0) { + DB_WARNING("Failed to pb_plan to execnode"); + return -1; + } + _ctx->add_sub_ctx(sub_query_ctx); + sub_query_ctx->root->find_place_holder(_ctx->placeholders); + } p_placeholders = &_ctx->placeholders; } if (p_placeholders == nullptr) { diff --git a/src/logical_plan/query_context.cpp b/src/logical_plan/query_context.cpp index 13ce6c51..ca953376 100644 --- a/src/logical_plan/query_context.cpp +++ b/src/logical_plan/query_context.cpp @@ -29,6 +29,13 @@ int QueryContext::create_plan_tree() { return ExecNode::create_tree(plan, &root); } +int QueryContext::destroy_plan_tree() { + need_destroy_tree = false; + ExecNode::destroy_tree(root); + root = nullptr; + return 0; +} + void QueryContext::update_ctx_stat_info(RuntimeState* state, int64_t query_total_time) { stat_info.num_returned_rows += state->num_returned_rows(); stat_info.num_affected_rows += state->num_affected_rows(); @@ -95,6 +102,12 @@ int QueryContext::copy_query_context(QueryContext* p_query_ctx) { sign_forcelearner.insert(p_query_ctx->sign_forcelearner.begin(), p_query_ctx->sign_forcelearner.end()); sign_forceindex.insert(p_query_ctx->sign_forceindex.begin(), p_query_ctx->sign_forceindex.end()); + has_derived_table = p_query_ctx->has_derived_table; + derived_table_ctx_mapping.insert(p_query_ctx->derived_table_ctx_mapping.begin(), + p_query_ctx->derived_table_ctx_mapping.end()); + slot_column_mapping.insert(p_query_ctx->slot_column_mapping.begin(), + p_query_ctx->slot_column_mapping.end()); + return 0; } diff --git a/src/logical_plan/setkv_planner.cpp b/src/logical_plan/setkv_planner.cpp index 74809d59..c4c78957 100644 --- a/src/logical_plan/setkv_planner.cpp +++ b/src/logical_plan/setkv_planner.cpp @@ -193,11 +193,12 @@ int SetKVPlanner::set_user_variable(const std::string& key, parser::ExprNode* ex DB_WARNING("expr open fail:%d", ret); return ret; } + bool is_constant = var_expr->is_constant(); ExprValue value = var_expr->get_value(nullptr); var_expr->close(); ExprNode::destroy_tree(var_expr); - if (var_expr->is_constant()) { + if (is_constant) { var_expr = new Literal(value); pb::ExprNode pb_node; var_expr->transfer_pb(&pb_node); diff --git a/src/logical_plan/update_planner.cpp b/src/logical_plan/update_planner.cpp index 607bdc05..0028d2f0 100644 --- a/src/logical_plan/update_planner.cpp +++ b/src/logical_plan/update_planner.cpp @@ -136,6 +136,10 @@ int UpdatePlanner::create_update_node(pb::PlanNode* update_node) { auto& slot = get_scan_ref_slot(try_to_lower(_current_tables[0]), table_id, field.id, field.type); update->add_primary_slots()->CopyFrom(slot); } + if (_ctx->row_ttl_duration > 0 || _ctx->row_ttl_duration == -1) { + update->set_row_ttl_duration(_ctx->row_ttl_duration); + DB_DEBUG("row_ttl_duration: %ld", _ctx->row_ttl_duration); + } return 0; } diff --git a/src/meta_server/meta_server.cpp b/src/meta_server/meta_server.cpp index 773e0d59..674ad65b 100644 --- a/src/meta_server/meta_server.cpp +++ b/src/meta_server/meta_server.cpp @@ -552,6 +552,13 @@ void MetaServer::raft_control(google::protobuf::RpcController* controller, pb::RaftControlResponse* response, google::protobuf::Closure* done) { brpc::ClosureGuard done_guard(done); + brpc::Controller* cntl = + static_cast(controller); + uint64_t log_id = 0; + if (cntl->has_log_id()) { + log_id = cntl->log_id(); + } + RETURN_IF_NOT_INIT(_init_success, response, log_id); if (request->region_id() == 0) { _meta_state_machine->raft_control(controller, request, response, done_guard.release()); return; diff --git a/src/meta_server/table_manager.cpp b/src/meta_server/table_manager.cpp index 16c6e0bb..e4ec47b5 100644 --- a/src/meta_server/table_manager.cpp +++ b/src/meta_server/table_manager.cpp @@ -1996,7 +1996,7 @@ void TableManager::rename_field(const pb::MetaManagerRequest& request, } int32_t field_id = 0; for (auto& mem_field : *mem_schema_pb.mutable_fields()) { - if (mem_field.field_name() == field.field_name()) { + if (!mem_field.deleted() && mem_field.field_name() == field.field_name()) { mem_field.set_field_name(field.new_field_name()); field_id = mem_field.field_id(); } @@ -2071,7 +2071,7 @@ void TableManager::modify_field(const pb::MetaManagerRequest& request, return; } for (auto& mem_field : *mem_schema_pb.mutable_fields()) { - if (mem_field.field_name() == field_name) { + if (!mem_field.deleted() && mem_field.field_name() == field_name) { if (field.has_mysql_type()) { if (!check_field_is_compatible_type(mem_field, field)) { // TODO 数据类型变更仅支持meta-only, 有损变更待支持 @@ -4364,7 +4364,7 @@ void TableManager::link_binlog(const pb::MetaManagerRequest& request, const int6 } if (request.table_info().has_link_field()) { for (const auto& field_info : mem_schema_pb.fields()) { - if (field_info.field_name() == request.table_info().link_field().field_name()) { + if (!field_info.deleted() && field_info.field_name() == request.table_info().link_field().field_name()) { link_field = field_info; get_field_info = true; break; diff --git a/src/physical_plan/join_reorder.cpp b/src/physical_plan/join_reorder.cpp index 066c7119..f1236b76 100644 --- a/src/physical_plan/join_reorder.cpp +++ b/src/physical_plan/join_reorder.cpp @@ -20,17 +20,28 @@ namespace baikaldb { int JoinReorder::analyze(QueryContext* ctx) { - JoinNode* join = static_cast(ctx->root->get_node(pb::JOIN_NODE)); + return reorder(ctx, ctx->root); +} +int JoinReorder::reorder(QueryContext* ctx, ExecNode* exec_node) { + JoinNode* join = static_cast(exec_node->get_node(pb::JOIN_NODE)); if (join == nullptr) { return 0; } + int ret = 0; std::map tuple_join_child_map; // join的所有非join孩子 std::map> tuple_equals_map; // 等值条件信息 std::vector tuple_order; // 目前join顺序 std::vector conditions; // join的全部条件,reorder需要重新下推 // 获取所有信息 if (!join->need_reorder(tuple_join_child_map, tuple_equals_map, tuple_order, conditions)) { - return 0; + if (join->children_size() == 2) { + ret = reorder(ctx, join->children(0)); + if (ret != 0) { + return ret; + } + ret = reorder(ctx, join->children(1)); + } + return ret; } ScanNode* first_node = static_cast( tuple_join_child_map[tuple_order[0]]->get_node(pb::SCAN_NODE)); diff --git a/src/physical_plan/limit_calc.cpp b/src/physical_plan/limit_calc.cpp index f65df29d..5e1f6abb 100644 --- a/src/physical_plan/limit_calc.cpp +++ b/src/physical_plan/limit_calc.cpp @@ -35,7 +35,7 @@ void LimitCalc::_analyze_limit(QueryContext* ctx, ExecNode* node, int64_t limit) case pb::TABLE_FILTER_NODE: case pb::WHERE_FILTER_NODE: { // 空filter可以下推 - if (static_cast(node)->pruned_conjuncts().empty()) { + if (static_cast(node)->is_empty_filter()) { break; } else { return; diff --git a/src/physical_plan/separate.cpp b/src/physical_plan/separate.cpp index 8870bcbb..6b0fb440 100644 --- a/src/physical_plan/separate.cpp +++ b/src/physical_plan/separate.cpp @@ -445,6 +445,7 @@ int Separate::separate_join(QueryContext* ctx, const std::vector& joi AggNode* agg_node = nullptr; SortNode* sort_node = nullptr; ExecNode* parent = join_node->get_parent(); + FilterNode* filter_node = nullptr; while (parent->node_type() != pb::JOIN_NODE && parent != ctx->root) { if (parent->node_type() == pb::LIMIT_NODE) { @@ -456,6 +457,9 @@ int Separate::separate_join(QueryContext* ctx, const std::vector& joi if (parent->node_type() == pb::SORT_NODE) { sort_node = static_cast(parent); } + if (parent->node_type() == pb::WHERE_FILTER_NODE) { + filter_node = static_cast(parent); + } parent = parent->get_parent(); } if (agg_node != nullptr) { @@ -487,6 +491,11 @@ int Separate::separate_join(QueryContext* ctx, const std::vector& joi } node = node->children(0); } + if (filter_node != nullptr) { + if (!filter_node->is_empty_filter()) { + need_pushdown = false; + } + } if (sort_node != nullptr) { for (auto expr : sort_node->slot_order_exprs()) { if(!join_node->expr_in_tuple_ids(*tuple_ids, expr)) { @@ -812,6 +821,9 @@ int Separate::separate_update(QueryContext* ctx) { if (ret < 0) { return -1; } + if (ctx->row_ttl_duration > 0) { + _row_ttl_duration = ctx->row_ttl_duration; + } int64_t main_table_id = update_node->table_id(); if (!need_separate_plan(ctx, main_table_id)) { auto region_infos = static_cast(scan_nodes[0])->region_infos();