From 25c19691f34583d6ec7c30e392569f12cd10be16 Mon Sep 17 00:00:00 2001 From: Leo Xie Date: Mon, 14 Oct 2024 09:30:20 +0800 Subject: [PATCH] Dev measure aggregate function (#528) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * MeasureAggregateFunctionService Support --------- Co-authored-by: 吴晟 Wu Sheng Co-authored-by: Gao Hongtao --- .../measure/aggregate/aggregate_function.go | 39 ++++++++-- banyand/measure/aggregate/avg.go | 54 +++++++------- banyand/measure/aggregate/avg_test.go | 12 ++- banyand/measure/aggregate/count.go | 53 ++++++++++++++ banyand/measure/aggregate/count_test.go | 40 ++++++++++ banyand/measure/aggregate/max.go | 47 ++++++++++++ banyand/measure/aggregate/max_test.go | 49 +++++++++++++ banyand/measure/aggregate/min.go | 19 ++--- banyand/measure/aggregate/min_test.go | 6 +- banyand/measure/aggregate/percent.go | 73 +++++++++++++++++++ banyand/measure/aggregate/percent_test.go | 41 +++++++++++ banyand/measure/aggregate/rate.go | 72 ++++++++++++++++++ banyand/measure/aggregate/rate_test.go | 41 +++++++++++ banyand/measure/aggregate/sum.go | 54 ++++++++++++++ banyand/measure/aggregate/sum_test.go | 67 +++++++++++++++++ 15 files changed, 616 insertions(+), 51 deletions(-) create mode 100644 banyand/measure/aggregate/count.go create mode 100644 banyand/measure/aggregate/count_test.go create mode 100644 banyand/measure/aggregate/max.go create mode 100644 banyand/measure/aggregate/max_test.go create mode 100644 banyand/measure/aggregate/percent.go create mode 100644 banyand/measure/aggregate/percent_test.go create mode 100644 banyand/measure/aggregate/rate.go create mode 100644 banyand/measure/aggregate/rate_test.go create mode 100644 banyand/measure/aggregate/sum.go create mode 100644 banyand/measure/aggregate/sum_test.go diff --git a/banyand/measure/aggregate/aggregate_function.go b/banyand/measure/aggregate/aggregate_function.go index 69427c94f..6e935825b 100644 --- a/banyand/measure/aggregate/aggregate_function.go +++ b/banyand/measure/aggregate/aggregate_function.go @@ -26,11 +26,13 @@ import ( ) // Void type contains nothing. It works as a placeholder for type parameters of `Arguments`. -type Void struct{} +// It's implemented as int64, but it won't be used as an int64. +type Void int64 -// Input covers possible types of Function's arguments. It synchronizes with `FieldType` in schema. +// Input covers possible types of Function's arguments. It synchronizes with `FieldType`. +// It also covers Void type. type Input interface { - Void | ~int64 | ~float64 + ~int64 | ~float64 } // Output covers possible types of Function's return value. @@ -38,8 +40,6 @@ type Output interface { ~int64 | ~float64 } -var errFieldValueType = fmt.Errorf("unsupported input value type on this field") - // Arguments represents the argument array, with one argument or two arguments. type Arguments[A, B Input] struct { arg0 []A @@ -52,8 +52,9 @@ type Function[A, B Input, R Output] interface { // It uses a two-dimensional array to represent the argument array. Combine(arguments Arguments[A, B]) error - // Result gives the result for the aggregation. - Result() R + // Result gives the result for the aggregation. R is the aggregating result, + // A is the first aggregating state, and B is the second aggregating state. + Result() (A, B, R) } // NewFunction constructs the aggregate function with given kind and parameter types. @@ -62,8 +63,18 @@ func NewFunction[A, B Input, R Output](kind modelv1.MeasureAggregate) (Function[ switch kind { case modelv1.MeasureAggregate_MEASURE_AGGREGATE_MIN: function = &Min[A, B, R]{minimum: maxValue[R]()} + case modelv1.MeasureAggregate_MEASURE_AGGREGATE_MAX: + function = &Max[A, B, R]{maximum: minValue[R]()} + case modelv1.MeasureAggregate_MEASURE_AGGREGATE_COUNT: + function = &Count[A, B, R]{count: 0} + case modelv1.MeasureAggregate_MEASURE_AGGREGATE_SUM: + function = &Sum[A, B, R]{summation: zeroValue[R]()} case modelv1.MeasureAggregate_MEASURE_AGGREGATE_AVG: function = &Avg[A, B, R]{summation: zeroValue[R](), count: 0} + case modelv1.MeasureAggregate_MEASURE_AGGREGATE_PERCENT: + function = &Percent[A, B, R]{total: 0, match: 0} + case modelv1.MeasureAggregate_MEASURE_AGGREGATE_RATE: + function = &Rate[A, B, R]{denominator: 0, numerator: 0} default: return nil, fmt.Errorf("MeasureAggregate unknown") } @@ -76,6 +87,20 @@ func zeroValue[R Output]() R { return r } +func minValue[R Output]() (r R) { + switch a := any(&r).(type) { + case *int64: + *a = math.MinInt64 + case *float64: + *a = -math.MaxFloat64 + case *string: + *a = "" + default: + panic("unreachable") + } + return +} + func maxValue[R Output]() (r R) { switch a := any(&r).(type) { case *int64: diff --git a/banyand/measure/aggregate/avg.go b/banyand/measure/aggregate/avg.go index c2fccf0cd..5f7043ab0 100644 --- a/banyand/measure/aggregate/avg.go +++ b/banyand/measure/aggregate/avg.go @@ -24,40 +24,44 @@ type Avg[A, B Input, R Output] struct { } // Combine takes elements to do the aggregation. -// Avg uses type parameter A and B. -func (a *Avg[A, B, R]) Combine(arguments Arguments[A, B]) error { - for _, arg0 := range arguments.arg0 { - switch arg0 := any(arg0).(type) { - case int64: - a.summation += R(arg0) - case float64: - a.summation += R(arg0) - default: - return errFieldValueType - } +// Avg uses type parameter A. +func (f *Avg[A, B, R]) Combine(arguments Arguments[A, B]) error { + i := 0 + n := len(arguments.arg0) + // step-4 aggregate + for ; i <= n-4; i += 4 { + f.summation += R(arguments.arg0[i]) + R(arguments.arg0[i+1]) + + R(arguments.arg0[i+2]) + R(arguments.arg0[i+3]) + } + // tail aggregate + for ; i < n; i++ { + f.summation += R(arguments.arg0[i]) } - for _, arg1 := range arguments.arg1 { - switch arg1 := any(arg1).(type) { - case int64: - a.count += arg1 - default: - return errFieldValueType - } + i = 0 + n = len(arguments.arg1) + // step-4 aggregate + for ; i <= n-4; i += 4 { + f.count += int64(arguments.arg1[i]) + int64(arguments.arg1[i+1]) + + int64(arguments.arg1[i+2]) + int64(arguments.arg1[i+3]) + } + // tail aggregate + for ; i < n; i++ { + f.count += int64(arguments.arg1[i]) } return nil } // Result gives the result for the aggregation. -func (a *Avg[A, B, R]) Result() R { - // In unusual situations it returns the zero value. - if a.count == 0 { - return zeroValue[R]() +func (f *Avg[A, B, R]) Result() (A, B, R) { + var average R + if f.count != 0 { + // According to the semantics of GoLang, the division of one int by another int + // returns an int, instead of f float. + average = f.summation / R(f.count) } - // According to the semantics of GoLang, the division of one int by another int - // returns an int, instead of a float. - return a.summation / R(a.count) + return A(f.summation), B(f.count), average } // NewAvgArguments constructs arguments. diff --git a/banyand/measure/aggregate/avg_test.go b/banyand/measure/aggregate/avg_test.go index 34801eb12..cc9321986 100644 --- a/banyand/measure/aggregate/avg_test.go +++ b/banyand/measure/aggregate/avg_test.go @@ -32,11 +32,14 @@ func TestAvg(t *testing.T) { // case1: input int64 values avgInt64, _ := aggregate.NewFunction[int64, int64, int64](modelv1.MeasureAggregate_MEASURE_AGGREGATE_AVG) err = avgInt64.Combine(aggregate.NewAvgArguments[int64]( - []int64{1, 2, 3}, // mock the "summation" column + []int64{1, 3, 3}, // mock the "summation" column []int64{1, 1, 1}, // mock the "count" column )) assert.NoError(t, err) - assert.Equal(t, int64(2), avgInt64.Result()) // note that 7/3 becomes 2 as int + a1, b1, r1 := avgInt64.Result() + assert.Equal(t, int64(7), a1) + assert.Equal(t, int64(3), b1) + assert.Equal(t, int64(2), r1) // note that 7/3 becomes 2 as int // case2: input float64 elements avgFloat64, _ := aggregate.NewFunction[float64, int64, float64](modelv1.MeasureAggregate_MEASURE_AGGREGATE_AVG) @@ -45,5 +48,8 @@ func TestAvg(t *testing.T) { []int64{1, 1, 1}, // mock the "count" column )) assert.NoError(t, err) - assert.Equal(t, 7.0/3, avgFloat64.Result()) + a2, b2, r2 := avgFloat64.Result() + assert.Equal(t, 7.0, a2) + assert.Equal(t, int64(3), b2) + assert.Equal(t, 7.0/3, r2) } diff --git a/banyand/measure/aggregate/count.go b/banyand/measure/aggregate/count.go new file mode 100644 index 000000000..ec2390f4c --- /dev/null +++ b/banyand/measure/aggregate/count.go @@ -0,0 +1,53 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package aggregate + +// Count calculates the count value of elements. +type Count[A, B Input, R Output] struct { + count int64 +} + +// Combine takes elements to do the aggregation. +// Count uses none of type parameters. +func (f *Count[A, B, R]) Combine(arguments Arguments[A, B]) error { + i := 0 + n := len(arguments.arg0) + // step-4 aggregate + for ; i <= n-4; i += 4 { + f.count += int64(arguments.arg0[i]) + int64(arguments.arg0[i+1]) + + int64(arguments.arg0[i+2]) + int64(arguments.arg0[i+3]) + } + // tail aggregate + for ; i < n; i++ { + f.count += int64(arguments.arg0[i]) + } + return nil +} + +// Result gives the result for the aggregation. +func (f *Count[A, B, R]) Result() (A, B, R) { + return A(f.count), zeroValue[B](), R(f.count) +} + +// NewCountArguments constructs arguments. +func NewCountArguments(a []int64) Arguments[int64, Void] { + return Arguments[int64, Void]{ + arg0: a, + arg1: nil, + } +} diff --git a/banyand/measure/aggregate/count_test.go b/banyand/measure/aggregate/count_test.go new file mode 100644 index 000000000..13bf9b0ba --- /dev/null +++ b/banyand/measure/aggregate/count_test.go @@ -0,0 +1,40 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package aggregate_test + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" + "github.com/apache/skywalking-banyandb/banyand/measure/aggregate" +) + +func TestCount(t *testing.T) { + var err error + + // case1: input int64 values + countInt64, _ := aggregate.NewFunction[int64, aggregate.Void, int64](modelv1.MeasureAggregate_MEASURE_AGGREGATE_COUNT) + err = countInt64.Combine(aggregate.NewCountArguments( + []int64{1, 2, 3}, // mock the "count" column + )) + assert.NoError(t, err) + _, _, r1 := countInt64.Result() + assert.Equal(t, int64(6), r1) +} diff --git a/banyand/measure/aggregate/max.go b/banyand/measure/aggregate/max.go new file mode 100644 index 000000000..995fb888f --- /dev/null +++ b/banyand/measure/aggregate/max.go @@ -0,0 +1,47 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package aggregate + +// Max calculates the maximum value of elements. +type Max[A, B Input, R Output] struct { + maximum R +} + +// Combine takes elements to do the aggregation. +// Max uses type parameter A. +func (f *Max[A, B, R]) Combine(arguments Arguments[A, B]) error { + for _, arg0 := range arguments.arg0 { + if R(arg0) > f.maximum { + f.maximum = R(arg0) + } + } + return nil +} + +// Result gives the result for the aggregation. +func (f *Max[A, B, R]) Result() (A, B, R) { + return A(f.maximum), zeroValue[B](), f.maximum +} + +// NewMaxArguments constructs arguments. +func NewMaxArguments[A Input](a []A) Arguments[A, Void] { + return Arguments[A, Void]{ + arg0: a, + arg1: nil, + } +} diff --git a/banyand/measure/aggregate/max_test.go b/banyand/measure/aggregate/max_test.go new file mode 100644 index 000000000..7a99cf215 --- /dev/null +++ b/banyand/measure/aggregate/max_test.go @@ -0,0 +1,49 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package aggregate_test + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" + "github.com/apache/skywalking-banyandb/banyand/measure/aggregate" +) + +func TestMax(t *testing.T) { + var err error + + // case1: input int64 values + maxInt64, _ := aggregate.NewFunction[int64, aggregate.Void, int64](modelv1.MeasureAggregate_MEASURE_AGGREGATE_MAX) + err = maxInt64.Combine(aggregate.NewMaxArguments[int64]( + []int64{1, 2, 3}, // mock the "maximum" column + )) + assert.NoError(t, err) + _, _, r1 := maxInt64.Result() + assert.Equal(t, int64(3), r1) + + // case2: input float64 values + maxFloat64, _ := aggregate.NewFunction[float64, aggregate.Void, float64](modelv1.MeasureAggregate_MEASURE_AGGREGATE_MAX) + err = maxFloat64.Combine(aggregate.NewMaxArguments[float64]( + []float64{1.0, 2.0, 3.0}, // mock the "maximum" column + )) + assert.NoError(t, err) + _, _, r2 := maxFloat64.Result() + assert.Equal(t, 3.0, r2) +} diff --git a/banyand/measure/aggregate/min.go b/banyand/measure/aggregate/min.go index 2caacd471..27b0f6660 100644 --- a/banyand/measure/aggregate/min.go +++ b/banyand/measure/aggregate/min.go @@ -24,27 +24,18 @@ type Min[A, B Input, R Output] struct { // Combine takes elements to do the aggregation. // Min uses type parameter A. -func (m *Min[A, B, R]) Combine(arguments Arguments[A, B]) error { +func (f *Min[A, B, R]) Combine(arguments Arguments[A, B]) error { for _, arg0 := range arguments.arg0 { - switch arg0 := any(arg0).(type) { - case int64: - if R(arg0) < m.minimum { - m.minimum = R(arg0) - } - case float64: - if R(arg0) < m.minimum { - m.minimum = R(arg0) - } - default: - return errFieldValueType + if R(arg0) < f.minimum { + f.minimum = R(arg0) } } return nil } // Result gives the result for the aggregation. -func (m *Min[A, B, R]) Result() R { - return m.minimum +func (f *Min[A, B, R]) Result() (A, B, R) { + return A(f.minimum), zeroValue[B](), f.minimum } // NewMinArguments constructs arguments. diff --git a/banyand/measure/aggregate/min_test.go b/banyand/measure/aggregate/min_test.go index fa6fd9499..e642e5155 100644 --- a/banyand/measure/aggregate/min_test.go +++ b/banyand/measure/aggregate/min_test.go @@ -35,7 +35,8 @@ func TestMin(t *testing.T) { []int64{1, 2, 3}, // mock the "minimum" column )) assert.NoError(t, err) - assert.Equal(t, int64(1), minInt64.Result()) + _, _, r1 := minInt64.Result() + assert.Equal(t, int64(1), r1) // case2: input float64 values minFloat64, _ := aggregate.NewFunction[float64, aggregate.Void, float64](modelv1.MeasureAggregate_MEASURE_AGGREGATE_MIN) @@ -43,5 +44,6 @@ func TestMin(t *testing.T) { []float64{1.0, 2.0, 3.0}, // mock the "minimum" column )) assert.NoError(t, err) - assert.Equal(t, 1.0, minFloat64.Result()) + _, _, r2 := minFloat64.Result() + assert.Equal(t, 1.0, r2) } diff --git a/banyand/measure/aggregate/percent.go b/banyand/measure/aggregate/percent.go new file mode 100644 index 000000000..86475bf71 --- /dev/null +++ b/banyand/measure/aggregate/percent.go @@ -0,0 +1,73 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package aggregate + +// Percent calculates the average value of elements. +type Percent[A, B Input, R Output] struct { + total int64 + match int64 +} + +// Combine takes elements to do the aggregation. +// Percent uses none of type parameters. +func (f *Percent[A, B, R]) Combine(arguments Arguments[A, B]) error { + i := 0 + n := len(arguments.arg0) + // step-4 aggregate + for ; i <= n-4; i += 4 { + f.total += int64(arguments.arg0[i]) + int64(arguments.arg0[i+1]) + + int64(arguments.arg0[i+2]) + int64(arguments.arg0[i+3]) + } + // tail aggregate + for ; i < n; i++ { + f.total += int64(arguments.arg0[i]) + } + + i = 0 + n = len(arguments.arg1) + // step-4 aggregate + for ; i <= n-4; i += 4 { + f.match += int64(arguments.arg1[i]) + int64(arguments.arg1[i+1]) + + int64(arguments.arg1[i+2]) + int64(arguments.arg1[i+3]) + } + // tail aggregate + for ; i < n; i++ { + f.match += int64(arguments.arg1[i]) + } + + return nil +} + +// Result gives the result for the aggregation. +func (f *Percent[A, B, R]) Result() (A, B, R) { + var percent R + if f.total != 0 { + // Factory 10000 is used to improve accuracy. This factory is same as OAP. + // For example, "10 percent" will return 1000. + percent = R(f.match) * 10000 / R(f.total) + } + return A(f.total), B(f.match), percent +} + +// NewPercentArguments constructs arguments. +func NewPercentArguments(a []int64, b []int64) Arguments[int64, int64] { + return Arguments[int64, int64]{ + arg0: a, + arg1: b, + } +} diff --git a/banyand/measure/aggregate/percent_test.go b/banyand/measure/aggregate/percent_test.go new file mode 100644 index 000000000..0dbb2d422 --- /dev/null +++ b/banyand/measure/aggregate/percent_test.go @@ -0,0 +1,41 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package aggregate_test + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" + "github.com/apache/skywalking-banyandb/banyand/measure/aggregate" +) + +func TestPercent(t *testing.T) { + var err error + + // case1: input int64 values + percentInt64, _ := aggregate.NewFunction[int64, int64, int64](modelv1.MeasureAggregate_MEASURE_AGGREGATE_PERCENT) + err = percentInt64.Combine(aggregate.NewPercentArguments( + []int64{10, 100, 1000}, // mock the "total" column + []int64{1, 10, 100}, // mock the "match" column + )) + assert.NoError(t, err) + _, _, r1 := percentInt64.Result() + assert.Equal(t, int64(1000), r1) +} diff --git a/banyand/measure/aggregate/rate.go b/banyand/measure/aggregate/rate.go new file mode 100644 index 000000000..a729f03ee --- /dev/null +++ b/banyand/measure/aggregate/rate.go @@ -0,0 +1,72 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package aggregate + +// Rate calculates the average value of elements. +type Rate[A, B Input, R Output] struct { + denominator int64 + numerator int64 +} + +// Combine takes elements to do the aggregation. +// Rate uses none of type parameters. +func (f *Rate[A, B, R]) Combine(arguments Arguments[A, B]) error { + i := 0 + n := len(arguments.arg0) + // step-4 aggregate + for ; i <= n-4; i += 4 { + f.denominator += int64(arguments.arg0[i]) + int64(arguments.arg0[i+1]) + + int64(arguments.arg0[i+2]) + int64(arguments.arg0[i+3]) + } + // tail aggregate + for ; i < n; i++ { + f.denominator += int64(arguments.arg0[i]) + } + + i = 0 + n = len(arguments.arg1) + // step-4 aggregate + for ; i <= n-4; i += 4 { + f.numerator += int64(arguments.arg1[i]) + int64(arguments.arg1[i+1]) + + int64(arguments.arg1[i+2]) + int64(arguments.arg1[i+3]) + } + // tail aggregate + for ; i < n; i++ { + f.numerator += int64(arguments.arg1[i]) + } + + return nil +} + +// Result gives the result for the aggregation. +func (f *Rate[A, B, R]) Result() (A, B, R) { + var rate R + if f.denominator != 0 { + // Factory 10000 is used to improve accuracy. This factory is same as OAP. + rate = R(f.numerator) * 10000 / R(f.denominator) + } + return A(f.denominator), B(f.numerator), rate +} + +// NewRateArguments constructs arguments. +func NewRateArguments(a []int64, b []int64) Arguments[int64, int64] { + return Arguments[int64, int64]{ + arg0: a, + arg1: b, + } +} diff --git a/banyand/measure/aggregate/rate_test.go b/banyand/measure/aggregate/rate_test.go new file mode 100644 index 000000000..50b3f6ea3 --- /dev/null +++ b/banyand/measure/aggregate/rate_test.go @@ -0,0 +1,41 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package aggregate_test + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" + "github.com/apache/skywalking-banyandb/banyand/measure/aggregate" +) + +func TestRate(t *testing.T) { + var err error + + // case1: input int64 values + rateInt64, _ := aggregate.NewFunction[int64, int64, int64](modelv1.MeasureAggregate_MEASURE_AGGREGATE_RATE) + err = rateInt64.Combine(aggregate.NewRateArguments( + []int64{10, 100, 1000}, // mock the "denominator" column + []int64{1, 10, 100}, // mock the "numerator" column + )) + assert.NoError(t, err) + _, _, r1 := rateInt64.Result() + assert.Equal(t, int64(1000), r1) +} diff --git a/banyand/measure/aggregate/sum.go b/banyand/measure/aggregate/sum.go new file mode 100644 index 000000000..cdac56c5c --- /dev/null +++ b/banyand/measure/aggregate/sum.go @@ -0,0 +1,54 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package aggregate + +// Sum calculates the summation value of elements. +type Sum[A, B Input, R Output] struct { + summation R +} + +// Combine takes elements to do the aggregation. +// Sum uses type parameter A. +func (f *Sum[A, B, R]) Combine(arguments Arguments[A, B]) error { + i := 0 + n := len(arguments.arg0) + // step-4 aggregate + for ; i <= n-4; i += 4 { + f.summation += R(arguments.arg0[i]) + R(arguments.arg0[i+1]) + + R(arguments.arg0[i+2]) + R(arguments.arg0[i+3]) + } + // tail aggregate + for ; i < n; i++ { + f.summation += R(arguments.arg0[i]) + } + + return nil +} + +// Result gives the result for the aggregation. +func (f *Sum[A, B, R]) Result() (A, B, R) { + return A(f.summation), zeroValue[B](), f.summation +} + +// NewSumArguments constructs arguments. +func NewSumArguments[A Input](a []A) Arguments[A, Void] { + return Arguments[A, Void]{ + arg0: a, + arg1: nil, + } +} diff --git a/banyand/measure/aggregate/sum_test.go b/banyand/measure/aggregate/sum_test.go new file mode 100644 index 000000000..07623a2f4 --- /dev/null +++ b/banyand/measure/aggregate/sum_test.go @@ -0,0 +1,67 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package aggregate_test + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" + "github.com/apache/skywalking-banyandb/banyand/measure/aggregate" +) + +func TestSum(t *testing.T) { + var err error + + // case1: input int64 values + sumInt64, _ := aggregate.NewFunction[int64, aggregate.Void, int64](modelv1.MeasureAggregate_MEASURE_AGGREGATE_SUM) + err = sumInt64.Combine(aggregate.NewSumArguments[int64]( + []int64{1, 2, 3}, // mock the "summation" column + )) + assert.NoError(t, err) + _, _, r1 := sumInt64.Result() + assert.Equal(t, int64(6), r1) + + // case2: input float64 values + sumFloat64, _ := aggregate.NewFunction[float64, aggregate.Void, float64](modelv1.MeasureAggregate_MEASURE_AGGREGATE_SUM) + err = sumFloat64.Combine(aggregate.NewSumArguments[float64]( + []float64{1.0, 2.0, 3.0}, // mock the "summation" column + )) + assert.NoError(t, err) + _, _, r2 := sumFloat64.Result() + assert.Equal(t, 6.0, r2) + + // case3: 7 values inputted + sumFloat64, _ = aggregate.NewFunction[float64, aggregate.Void, float64](modelv1.MeasureAggregate_MEASURE_AGGREGATE_SUM) + err = sumFloat64.Combine(aggregate.NewSumArguments[float64]( + []float64{1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0}, // mock the "summation" column + )) + assert.NoError(t, err) + _, _, r3 := sumFloat64.Result() + assert.Equal(t, 28.0, r3) + + // case4: 4 values inputted + sumFloat64, _ = aggregate.NewFunction[float64, aggregate.Void, float64](modelv1.MeasureAggregate_MEASURE_AGGREGATE_SUM) + err = sumFloat64.Combine(aggregate.NewSumArguments[float64]( + []float64{1.0, 2.0, 3.0, 4.0}, // mock the "summation" column + )) + assert.NoError(t, err) + _, _, r4 := sumFloat64.Result() + assert.Equal(t, 10.0, r4) +}