From c64724a19cbaf4fee86d042929cd5a4a7ef8b0c6 Mon Sep 17 00:00:00 2001 From: Leo Xie Date: Mon, 21 Oct 2024 08:54:41 +0800 Subject: [PATCH] [ospp] Implements MeasureAggregateFunctionService.Support API (#545) * implements measureAggregateFunctionServer.Support * fix lint --------- Co-authored-by: Gao Hongtao --- banyand/liaison/grpc/registry.go | 65 ++++++++++++++++++++++++++++++++ banyand/liaison/grpc/server.go | 2 + 2 files changed, 67 insertions(+) diff --git a/banyand/liaison/grpc/registry.go b/banyand/liaison/grpc/registry.go index ef3a59e72..5e2983a8e 100644 --- a/banyand/liaison/grpc/registry.go +++ b/banyand/liaison/grpc/registry.go @@ -27,6 +27,7 @@ import ( commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1" databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" + modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" "github.com/apache/skywalking-banyandb/banyand/metadata" "github.com/apache/skywalking-banyandb/banyand/metadata/schema" ) @@ -824,3 +825,67 @@ func (ts *topNAggregationRegistryServer) Exist(ctx context.Context, req *databas } return &databasev1.TopNAggregationRegistryServiceExistResponse{HasGroup: exist, HasTopNAggregation: false}, nil } + +type measureAggregateFunctionServer struct { + databasev1.UnimplementedMeasureAggregateFunctionServiceServer + supports []*databasev1.MeasureAggregateFunction +} + +func newMeasureAggregateFunctionServer() *measureAggregateFunctionServer { + functions := []*databasev1.MeasureAggregateFunction{ + { + Type: databasev1.FieldType_FIELD_TYPE_INT, + AggregateFunction: modelv1.MeasureAggregate_MEASURE_AGGREGATE_MIN, + }, + { + Type: databasev1.FieldType_FIELD_TYPE_FLOAT, + AggregateFunction: modelv1.MeasureAggregate_MEASURE_AGGREGATE_MIN, + }, + { + Type: databasev1.FieldType_FIELD_TYPE_INT, + AggregateFunction: modelv1.MeasureAggregate_MEASURE_AGGREGATE_MAX, + }, + { + Type: databasev1.FieldType_FIELD_TYPE_FLOAT, + AggregateFunction: modelv1.MeasureAggregate_MEASURE_AGGREGATE_MAX, + }, + { + Type: databasev1.FieldType_FIELD_TYPE_INT, + AggregateFunction: modelv1.MeasureAggregate_MEASURE_AGGREGATE_COUNT, + }, + { + Type: databasev1.FieldType_FIELD_TYPE_INT, + AggregateFunction: modelv1.MeasureAggregate_MEASURE_AGGREGATE_SUM, + }, + { + Type: databasev1.FieldType_FIELD_TYPE_FLOAT, + AggregateFunction: modelv1.MeasureAggregate_MEASURE_AGGREGATE_SUM, + }, + { + Type: databasev1.FieldType_FIELD_TYPE_INT, + AggregateFunction: modelv1.MeasureAggregate_MEASURE_AGGREGATE_AVG, + }, + { + Type: databasev1.FieldType_FIELD_TYPE_FLOAT, + AggregateFunction: modelv1.MeasureAggregate_MEASURE_AGGREGATE_AVG, + }, + { + Type: databasev1.FieldType_FIELD_TYPE_INT, + AggregateFunction: modelv1.MeasureAggregate_MEASURE_AGGREGATE_PERCENT, + }, + { + Type: databasev1.FieldType_FIELD_TYPE_INT, + AggregateFunction: modelv1.MeasureAggregate_MEASURE_AGGREGATE_RATE, + }, + } + + return &measureAggregateFunctionServer{supports: functions} +} + +func (ms *measureAggregateFunctionServer) Support(_ context.Context, _ *databasev1.MeasureAggregateFunctionServiceSupportRequest) ( + *databasev1.MeasureAggregateFunctionServiceSupportResponse, error, +) { + return &databasev1.MeasureAggregateFunctionServiceSupportResponse{ + MeasureAggregateFunction: ms.supports, + }, nil +} diff --git a/banyand/liaison/grpc/server.go b/banyand/liaison/grpc/server.go index af89eb964..0250dfab4 100644 --- a/banyand/liaison/grpc/server.go +++ b/banyand/liaison/grpc/server.go @@ -73,6 +73,7 @@ type server struct { log *logger.Logger *propertyServer *topNAggregationRegistryServer + *measureAggregateFunctionServer *groupRegistryServer stopCh chan struct{} *indexRuleRegistryServer @@ -127,6 +128,7 @@ func NewServer(_ context.Context, pipeline, broadcaster queue.Client, schemaRegi topNAggregationRegistryServer: &topNAggregationRegistryServer{ schemaRegistry: schemaRegistry, }, + measureAggregateFunctionServer: newMeasureAggregateFunctionServer(), propertyServer: &propertyServer{ schemaRegistry: schemaRegistry, },