Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: min/max消除, top n #249

Open
wants to merge 13 commits into
base: master
Choose a base branch
from
14 changes: 12 additions & 2 deletions include/common/expr_value.h
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ struct ExprValue {
case pb::HLL:
case pb::HEX:
case pb::TDIGEST:
case pb::JSON:
str_val = value.string_val();
break;
case pb::BITMAP: {
Expand All @@ -190,6 +191,7 @@ struct ExprValue {
float_precision_len = -1;
str_val = value_str;
if (primitive_type == pb::STRING
|| primitive_type == pb::JSON
|| primitive_type == pb::HEX
|| primitive_type == pb::BITMAP
|| primitive_type == pb::HLL
Expand Down Expand Up @@ -323,6 +325,7 @@ struct ExprValue {
case pb::STRING:
case pb::HEX:
case pb::BITMAP:
case pb::JSON:
case pb::TDIGEST:
value->set_string_val(str_val);
break;
Expand Down Expand Up @@ -422,6 +425,7 @@ struct ExprValue {
case pb::STRING:
case pb::HEX:
case pb::HLL:
case pb::JSON:
case pb::TDIGEST:
return str_val.length();
case pb::DATETIME:
Expand Down Expand Up @@ -522,6 +526,9 @@ struct ExprValue {
case pb::STRING:
str_val = get_string();
break;
case pb::JSON:
str_val = get_string();
break;
case pb::BITMAP: {
_u.bitmap = new(std::nothrow) Roaring();
if (str_val.size() > 0) {
Expand Down Expand Up @@ -576,6 +583,7 @@ struct ExprValue {
butil::MurmurHash3_x64_128(&_u, 8, seed, out);
return out[0];
case pb::STRING:
case pb::JSON:
case pb::HEX: {
butil::MurmurHash3_x64_128(str_val.c_str(), str_val.size(), seed, out);
return out[0];
Expand Down Expand Up @@ -627,6 +635,7 @@ struct ExprValue {
}
case pb::STRING:
case pb::HEX:
case pb::JSON:
case pb::HLL:
case pb::TDIGEST:
return str_val;
Expand Down Expand Up @@ -741,6 +750,7 @@ struct ExprValue {
(_u.double_val < other._u.double_val ? -1 : 0);
case pb::STRING:
case pb::HEX:
case pb::JSON:
return str_val.compare(other.str_val);
case pb::NULL_TYPE:
return -1;
Expand Down Expand Up @@ -795,7 +805,7 @@ struct ExprValue {
}

bool is_string() const {
return type == pb::STRING || type == pb::HEX || type == pb::BITMAP || type == pb::HLL || type == pb::TDIGEST;
return type == pb::STRING || type == pb::HEX || type == pb::BITMAP || type == pb::HLL || type == pb::TDIGEST || type == pb::JSON;
}

bool is_double() const {
Expand Down Expand Up @@ -934,7 +944,7 @@ struct ExprValue {

struct HashFunction {
size_t operator()(const ExprValue& ev) const {
if (ev.type == pb::STRING || ev.type == pb::HEX) {
if (ev.type == pb::STRING || ev.type == pb::HEX || ev.type == pb::JSON) {
return ev.hash();
}
return ev._u.uint64_val;
Expand Down
4 changes: 4 additions & 0 deletions include/common/type_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,8 @@ inline uint8_t to_mysql_type(pb::PrimitiveType type) {
case pb::BITMAP:
case pb::TDIGEST:
return MYSQL_TYPE_STRING;
case pb::JSON:
return MYSQL_TYPE_JSON;
default:
return MYSQL_TYPE_STRING;
}
Expand Down Expand Up @@ -399,6 +401,8 @@ inline std::string to_mysql_type_string(pb::PrimitiveType type) {
case pb::BITMAP:
case pb::TDIGEST:
return "binary";
case pb::JSON:
return "json";
default:
return "text";
}
Expand Down
15 changes: 15 additions & 0 deletions include/exec/exec_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,22 @@ class ExecNode {
bool is_get_keypoint() {
return _is_get_keypoint;
}
bool set_has_optimized(bool has_optimized) {
_has_optimized = has_optimized;
}
bool has_optimized() {
if (_has_optimized) {
return true;
}
for (auto child : _children) {
if (child->has_optimized()) {
return true;
}
}
return false;
}
protected:
bool _has_optimized = false;
int64_t _limit = -1;
int64_t _num_rows_returned = 0;
bool _is_explain = false;
Expand Down
3 changes: 3 additions & 0 deletions include/exec/sort_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ class SortNode : public ExecNode {
for (auto expr : _slot_order_exprs) {
ExprNode::create_pb_expr(sort_node->add_slot_order_exprs(), expr);
}
if (_limit != -1) {
pb_node->set_limit(_limit);
}
}

void transfer_fetcher_pb(pb::FetcherNode* pb_fetcher) {
Expand Down
1 change: 1 addition & 0 deletions include/expr/expr_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ class ExprNode {

// optimize or node to in node
static void or_node_optimize(ExprNode** expr_node);
static bool like_node_optimize(ExprNode** root, std::vector<ExprNode*>& new_exprs);
bool has_same_children();
bool is_vaild_or_optimize_tree(int32_t level, std::unordered_set<int32_t>* tuple_set);
static int change_or_node_to_in(ExprNode** expr_node);
Expand Down
6 changes: 6 additions & 0 deletions include/expr/internal_functions.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ ExprValue pi(const std::vector<ExprValue>& input);
ExprValue greatest(const std::vector<ExprValue>& input);
ExprValue least(const std::vector<ExprValue>& input);
ExprValue pow(const std::vector<ExprValue>& input);
ExprValue bit_count(const std::vector<ExprValue>& input);
//string functions
ExprValue length(const std::vector<ExprValue>& input);
ExprValue bit_length(const std::vector<ExprValue>& input);
Expand All @@ -66,6 +67,11 @@ ExprValue lpad(const std::vector<ExprValue>& input);
ExprValue rpad(const std::vector<ExprValue>& input);
ExprValue instr(const std::vector<ExprValue>& input);
ExprValue json_extract(const std::vector<ExprValue>& input);
ExprValue json_extract1(const std::vector<ExprValue>& input);
ExprValue json_type(const std::vector<ExprValue>& input);
ExprValue json_array(const std::vector<ExprValue>& input);
ExprValue json_object(const std::vector<ExprValue>& input);
ExprValue json_valid(const std::vector<ExprValue>& input);
ExprValue export_set(const std::vector<ExprValue>& input);
ExprValue to_base64(const std::vector<ExprValue>& input);
ExprValue from_base64(const std::vector<ExprValue>& input);
Expand Down
2 changes: 2 additions & 0 deletions include/logical_plan/select_planner.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ class SelectPlanner : public LogicalPlanner {
int parse_limit();

int subquery_rewrite();

int minmax_remove();

bool is_full_export();

Expand Down
1 change: 1 addition & 0 deletions include/runtime/runtime_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -469,6 +469,7 @@ class RuntimeState {
int range_count_limit = 0;
int64_t _sql_exec_timeout = -1;
bool _is_ddl_work = false;
bool must_have_one = false;
private:
bool _is_inited = false;
bool _is_cancelled = false;
Expand Down
13 changes: 7 additions & 6 deletions include/runtime/sorter.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,24 +26,25 @@ class Sorter {
public:
Sorter(MemRowCompare* comp) : _comp(comp), _idx(0) {
}
void add_batch(std::shared_ptr<RowBatch>& batch) {
virtual void add_batch(std::shared_ptr<RowBatch>& batch) {
batch->reset();
_min_heap.push_back(batch);
}
void sort();
void merge_sort();
int get_next(RowBatch* batch, bool* eos);
virtual void sort();
virtual void merge_sort();
virtual int get_next(RowBatch* batch, bool* eos);

size_t batch_size() {
return _min_heap.size();
}
private:
void multi_sort();
void make_heap();
void shiftdown(size_t index);
virtual void shiftdown(size_t index);

private:
protected:
MemRowCompare* _comp;
private:
std::vector<std::shared_ptr<RowBatch>> _min_heap;
size_t _idx;
};
Expand Down
52 changes: 52 additions & 0 deletions include/runtime/topn_sorter.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
// Copyright (c) 2018-present Baidu, Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <algorithm>
#include <vector>
#include "common.h"
#include "row_batch.h"
#include "mem_row_compare.h"
#include "sorter.h"

namespace baikaldb {
//对每个batch并行的做sort后,再用heap做归并

struct TopNHeapItem {
std::unique_ptr<baikaldb::MemRow> row;
int64_t idx;
};

class TopNSorter : public Sorter {
public:
TopNSorter(MemRowCompare* comp, int64_t limit) : Sorter(comp), _limit(limit) {
}
virtual void add_batch(std::shared_ptr<RowBatch>& batch);
virtual void sort();
virtual void merge_sort(){}
virtual int get_next(RowBatch* batch, bool* eos);
private:
virtual void shiftdown(size_t index);
virtual void shiftup(size_t index);

private:
std::vector<TopNHeapItem> _mem_row_heap;
int64_t _limit = -1;
int64_t _current_count = 0;
int64_t _current_idx = 0;
};
}

/* vim: set ts=4 sw=4 sts=4 tw=100 */
10 changes: 1 addition & 9 deletions include/session/user_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,15 +81,7 @@ struct UserInfo {

~UserInfo() {}

bool is_exceed_quota() {
if (query_cost.get_time() > 1000000) {
query_cost.reset();
query_count = 0;
return false;
}
return query_count++ > query_quota;
}

bool is_exceed_quota();
bool connection_inc() {
bool res = false;
std::lock_guard<std::mutex> guard(conn_mutex);
Expand Down
2 changes: 2 additions & 0 deletions include/sqlparser/sql_lex.l
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,8 @@ VAR_SAMP { un_reserved_keyword(yylval, yyscanner, parser); return VAR_SAMP; }
\|\| { return OR; }
\<\< { return LS_OP; }
\>\> { return RS_OP; }
\-\> { return JS_OP; }
\-\>\> { return JS_OP1; }

[0-9]+ {
//integer
Expand Down
37 changes: 35 additions & 2 deletions include/sqlparser/sql_parse.y
Original file line number Diff line number Diff line change
Expand Up @@ -481,7 +481,7 @@ extern int sql_error(YYLTYPE* yylloc, yyscan_t yyscanner, SqlParser* parser, con
VAR_SAMP
USER_AGG

%token EQ_OP ASSIGN_OP MOD_OP GE_OP GT_OP LE_OP LT_OP NE_OP AND_OP OR_OP NOT_OP LS_OP RS_OP CHINESE_DOT
%token EQ_OP ASSIGN_OP MOD_OP GE_OP GT_OP LE_OP LT_OP NE_OP AND_OP OR_OP NOT_OP LS_OP RS_OP CHINESE_DOT JS_OP JS_OP1
%token <string> IDENT
%token <expr> STRING_LIT INTEGER_LIT DECIMAL_LIT PLACE_HOLDER_LIT

Expand Down Expand Up @@ -760,7 +760,8 @@ extern int sql_error(YYLTYPE* yylloc, yyscan_t yyscanner, SqlParser* parser, con
%left EQ_OP NE_OP GE_OP GT_OP LE_OP LT_OP IS LIKE IN
%left '|'
%left '&'
%left LS_OP RS_OP
%left JS_OP1
%left LS_OP RS_OP JS_OP
%left '+' '-'
%left '*' '/' MOD_OP MOD
%left '^'
Expand Down Expand Up @@ -1866,6 +1867,38 @@ SelectField:
select_field->as_name = $5;
$$ = select_field;
}
| ColumnName JS_OP STRING_LIT {
SelectField* select_field = new_node(SelectField);
FuncExpr* fun = new_node(FuncExpr);
fun->fn_name = "json_extract1";
fun->children.push_back($1, parser->arena);
fun->children.push_back($3, parser->arena);
select_field->expr = fun;
parser::String t1, t2;
t1 = "->\"";
t2 = "\"";
select_field->org_name = ((ColumnName*) $1)->name;
select_field->org_name.append("->\"", parser->arena);
select_field->org_name.append(((LiteralExpr*)$3)->_u.str_val.c_str(), parser->arena);
select_field->org_name.append("\"", parser->arena);
$$ = select_field;
}
| ColumnName JS_OP1 STRING_LIT {
SelectField* select_field = new_node(SelectField);
FuncExpr* fun = new_node(FuncExpr);
fun->fn_name = "json_extract";
fun->children.push_back($1, parser->arena);
fun->children.push_back($3, parser->arena);
select_field->expr = fun;
parser::String t1, t2;
t1 = "->\"";
t2 = "\"";
select_field->org_name = ((ColumnName*) $1)->name;
select_field->org_name.append("->\"", parser->arena);
select_field->org_name.append(((LiteralExpr*)$3)->_u.str_val.c_str(), parser->arena);
select_field->org_name.append("\"", parser->arena);
$$ = select_field;
}
;
FieldAsNameOpt:
/* EMPTY */
Expand Down
5 changes: 3 additions & 2 deletions proto/common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ enum PrimitiveType {
HEX = 20;
BITMAP = 21;
TDIGEST = 22;
MAXVALUE_TYPE = 23;
JSON = 23;
MAXVALUE_TYPE = 24;
};

enum SchemaType {
Expand Down Expand Up @@ -146,4 +147,4 @@ message ExprValue {
optional float float_val = 7;
optional double double_val = 8;
optional bytes string_val = 9;
};
};
1 change: 1 addition & 0 deletions src/common/common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -479,6 +479,7 @@ int primitive_to_proto_type(pb::PrimitiveType type) {
{ pb::BOOL, FieldDescriptorProto::TYPE_BOOL},
{ pb::BITMAP, FieldDescriptorProto::TYPE_BYTES},
{ pb::TDIGEST, FieldDescriptorProto::TYPE_BYTES},
{ pb::JSON, FieldDescriptorProto::TYPE_BYTES},
{ pb::NULL_TYPE, FieldDescriptorProto::TYPE_BOOL}
};
if (_mysql_pb_type_mapping.count(type) == 0) {
Expand Down
Loading
Loading