diff --git a/CHANGES.md b/CHANGES.md index 423803ad9..dcfd8e6b9 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -4,6 +4,10 @@ Release Notes. ## 0.8.0 +### Bug Fixes + +- Fix the bug that TopN processing item leak. The item can not be updated but as a new item. + ### Documentation - Improve the description of the memory in observability doc. diff --git a/banyand/measure/topn.go b/banyand/measure/topn.go index 750b0f065..c64270aaf 100644 --- a/banyand/measure/topn.go +++ b/banyand/measure/topn.go @@ -64,6 +64,7 @@ var ( type dataPointWithEntityValues struct { *measurev1.DataPointValue entityValues []*modelv1.TagValue + seriesID uint64 } type topNStreamingProcessor struct { @@ -272,6 +273,9 @@ func (t *topNStreamingProcessor) start() *topNStreamingProcessor { t.errCh = t.streamingFlow.Window(streaming.NewTumblingTimeWindows(t.interval, flushInterval)). AllowedMaxWindows(int(t.topNSchema.GetLruSize())). TopN(int(t.topNSchema.GetCountersNumber()), + streaming.WithKeyExtractor(func(record flow.StreamRecord) uint64 { + return record.Data().(flow.Data)[4].(uint64) + }), streaming.WithSortKeyExtractor(func(record flow.StreamRecord) int64 { return record.Data().(flow.Data)[2].(int64) }), @@ -322,7 +326,7 @@ func (manager *topNProcessorManager) Close() error { return err } -func (manager *topNProcessorManager) onMeasureWrite(request *measurev1.InternalWriteRequest) { +func (manager *topNProcessorManager) onMeasureWrite(seriesID uint64, request *measurev1.InternalWriteRequest) { go func() { manager.RLock() defer manager.RUnlock() @@ -331,6 +335,7 @@ func (manager *topNProcessorManager) onMeasureWrite(request *measurev1.InternalW processor.src <- flow.NewStreamRecordWithTimestampPb(&dataPointWithEntityValues{ request.GetRequest().GetDataPoint(), request.GetEntityValues(), + seriesID, }, request.GetRequest().GetDataPoint().GetTimestamp()) } } @@ -436,6 +441,7 @@ func (manager *topNProcessorManager) buildMapper(fieldName string, groupByNames dpWithEvs.GetFields()[fieldIdx].GetInt().GetValue(), // groupBy tag values as v3 nil, + dpWithEvs.seriesID, } }, nil } @@ -458,6 +464,7 @@ func (manager *topNProcessorManager) buildMapper(fieldName string, groupByNames transform(groupLocator, func(locator partition.TagLocator) *modelv1.TagValue { return extractTagValue(dpWithEvs.DataPointValue, locator) }), + dpWithEvs.seriesID, } }, nil } diff --git a/banyand/measure/write.go b/banyand/measure/write.go index 64362ab2d..2eaaffddd 100644 --- a/banyand/measure/write.go +++ b/banyand/measure/write.go @@ -193,7 +193,7 @@ func (w *writeCallback) handle(dst map[string]*dataPointsInGroup, writeEvent *me dpt.dataPoints.tagFamilies = append(dpt.dataPoints.tagFamilies, tagFamilies) if stm.processorManager != nil { - stm.processorManager.onMeasureWrite(&measurev1.InternalWriteRequest{ + stm.processorManager.onMeasureWrite(uint64(series.ID), &measurev1.InternalWriteRequest{ Request: &measurev1.WriteRequest{ Metadata: stm.GetSchema().Metadata, DataPoint: req.DataPoint, diff --git a/pkg/flow/streaming/streaming_test.go b/pkg/flow/streaming/streaming_test.go index 796022e5f..a3556bd7a 100644 --- a/pkg/flow/streaming/streaming_test.go +++ b/pkg/flow/streaming/streaming_test.go @@ -25,6 +25,7 @@ import ( g "github.com/onsi/ginkgo/v2" "github.com/onsi/gomega" + "github.com/apache/skywalking-banyandb/pkg/convert" "github.com/apache/skywalking-banyandb/pkg/flow" "github.com/apache/skywalking-banyandb/pkg/test/flags" flowTest "github.com/apache/skywalking-banyandb/pkg/test/flow" @@ -146,14 +147,17 @@ var _ = g.Describe("Streaming", func() { f = New("test", flowTest.NewSlice(input)). Map(flow.UnaryFunc[any](func(_ context.Context, item interface{}) interface{} { // groupBy - return flow.Data{item.(*record).service, int64(item.(*record).value)} + return flow.Data{item.(*record).service, int64(item.(*record).value), item.(*record).service + item.(*record).instance} })). Window(NewTumblingTimeWindows(15*time.Second, 15*time.Second)). - TopN(3, WithSortKeyExtractor(func(record flow.StreamRecord) int64 { - return record.Data().(flow.Data)[1].(int64) - }), OrderBy(ASC), WithGroupKeyExtractor(func(record flow.StreamRecord) string { - return record.Data().(flow.Data)[0].(string) - })). + TopN(3, WithKeyExtractor(func(record flow.StreamRecord) uint64 { + return convert.HashStr(record.Data().(flow.Data)[2].(string)) + }), + WithSortKeyExtractor(func(record flow.StreamRecord) int64 { + return record.Data().(flow.Data)[1].(int64) + }), OrderBy(ASC), WithGroupKeyExtractor(func(record flow.StreamRecord) string { + return record.Data().(flow.Data)[0].(string) + })). To(snk) errCh = f.Open() @@ -179,15 +183,15 @@ var _ = g.Describe("Streaming", func() { g.Expect(len(snk.Value())).Should(gomega.BeNumerically(">=", 1)) // e2e-service-consumer Group g.Expect(snk.Value()[0].(flow.StreamRecord).Data().(map[string][]*Tuple2)["e2e-service-consumer"]).Should(gomega.BeEquivalentTo([]*Tuple2{ - {int64(9500), flow.NewStreamRecord(flow.Data{"e2e-service-consumer", int64(9500)}, 7000)}, - {int64(9600), flow.NewStreamRecord(flow.Data{"e2e-service-consumer", int64(9600)}, 6000)}, - {int64(9700), flow.NewStreamRecord(flow.Data{"e2e-service-consumer", int64(9700)}, 4000)}, + {int64(9500), flow.NewStreamRecord(flow.Data{"e2e-service-consumer", int64(9500), "e2e-service-consumerinstance-001"}, 7000)}, + {int64(9600), flow.NewStreamRecord(flow.Data{"e2e-service-consumer", int64(9600), "e2e-service-consumerinstance-004"}, 6000)}, + {int64(9700), flow.NewStreamRecord(flow.Data{"e2e-service-consumer", int64(9700), "e2e-service-consumerinstance-002"}, 4000)}, })) // e2e-service-provider Group g.Expect(snk.Value()[0].(flow.StreamRecord).Data().(map[string][]*Tuple2)["e2e-service-provider"]).Should(gomega.BeEquivalentTo([]*Tuple2{ - {int64(9700), flow.NewStreamRecord(flow.Data{"e2e-service-provider", int64(9700)}, 5000)}, - {int64(9800), flow.NewStreamRecord(flow.Data{"e2e-service-provider", int64(9800)}, 3000)}, - {int64(10000), flow.NewStreamRecord(flow.Data{"e2e-service-provider", int64(10000)}, 1000)}, + {int64(9700), flow.NewStreamRecord(flow.Data{"e2e-service-provider", int64(9700), "e2e-service-providerinstance-003"}, 5000)}, + {int64(9800), flow.NewStreamRecord(flow.Data{"e2e-service-provider", int64(9800), "e2e-service-providerinstance-002"}, 3000)}, + {int64(10000), flow.NewStreamRecord(flow.Data{"e2e-service-provider", int64(10000), "e2e-service-providerinstance-001"}, 1000)}, })) }).WithTimeout(flags.EventuallyTimeout).Should(gomega.Succeed()) }) @@ -209,10 +213,12 @@ var _ = g.Describe("Streaming", func() { f = New("test", flowTest.NewSlice(input)). Map(flow.UnaryFunc[any](func(_ context.Context, item interface{}) interface{} { // groupBy - return flow.Data{item.(*record).service, int64(item.(*record).value)} + return flow.Data{item.(*record).service, int64(item.(*record).value), item.(*record).service + item.(*record).instance} })). Window(NewTumblingTimeWindows(15*time.Second, 15*time.Second)). - TopN(3, WithSortKeyExtractor(func(record flow.StreamRecord) int64 { + TopN(3, WithKeyExtractor(func(record flow.StreamRecord) uint64 { + return convert.HashStr(record.Data().(flow.Data)[2].(string)) + }), WithSortKeyExtractor(func(record flow.StreamRecord) int64 { return record.Data().(flow.Data)[1].(int64) }), WithGroupKeyExtractor(func(record flow.StreamRecord) string { return record.Data().(flow.Data)[0].(string) @@ -242,15 +248,15 @@ var _ = g.Describe("Streaming", func() { g.Expect(len(snk.Value())).Should(gomega.BeNumerically(">=", 1)) // e2e-service-consumer Group g.Expect(snk.Value()[0].(flow.StreamRecord).Data().(map[string][]*Tuple2)["e2e-service-consumer"]).Should(gomega.BeEquivalentTo([]*Tuple2{ - {int64(9900), flow.NewStreamRecord(flow.Data{"e2e-service-consumer", int64(9900)}, 2000)}, - {int64(9700), flow.NewStreamRecord(flow.Data{"e2e-service-consumer", int64(9700)}, 4000)}, - {int64(9600), flow.NewStreamRecord(flow.Data{"e2e-service-consumer", int64(9600)}, 6000)}, + {int64(9700), flow.NewStreamRecord(flow.Data{"e2e-service-consumer", int64(9700), "e2e-service-consumerinstance-002"}, 4000)}, + {int64(9600), flow.NewStreamRecord(flow.Data{"e2e-service-consumer", int64(9600), "e2e-service-consumerinstance-004"}, 6000)}, + {int64(9500), flow.NewStreamRecord(flow.Data{"e2e-service-consumer", int64(9500), "e2e-service-consumerinstance-001"}, 7000)}, })) // e2e-service-provider Group g.Expect(snk.Value()[0].(flow.StreamRecord).Data().(map[string][]*Tuple2)["e2e-service-provider"]).Should(gomega.BeEquivalentTo([]*Tuple2{ - {int64(10000), flow.NewStreamRecord(flow.Data{"e2e-service-provider", int64(10000)}, 1000)}, - {int64(9800), flow.NewStreamRecord(flow.Data{"e2e-service-provider", int64(9800)}, 3000)}, - {int64(9700), flow.NewStreamRecord(flow.Data{"e2e-service-provider", int64(9700)}, 5000)}, + {int64(10000), flow.NewStreamRecord(flow.Data{"e2e-service-provider", int64(10000), "e2e-service-providerinstance-001"}, 1000)}, + {int64(9800), flow.NewStreamRecord(flow.Data{"e2e-service-provider", int64(9800), "e2e-service-providerinstance-002"}, 3000)}, + {int64(9700), flow.NewStreamRecord(flow.Data{"e2e-service-provider", int64(9700), "e2e-service-providerinstance-003"}, 5000)}, })) }).WithTimeout(flags.EventuallyTimeout).Should(gomega.Succeed()) }) diff --git a/pkg/flow/streaming/topn.go b/pkg/flow/streaming/topn.go index eaaafe247..542382ec5 100644 --- a/pkg/flow/streaming/topn.go +++ b/pkg/flow/streaming/topn.go @@ -76,6 +76,7 @@ func (s *windowedFlow) TopN(topNum int, opts ...any) flow.Flow { type topNAggregatorGroup struct { aggregatorGroup map[string]*topNAggregator + keyExtractor func(flow.StreamRecord) uint64 sortKeyExtractor func(flow.StreamRecord) int64 groupKeyExtractor func(flow.StreamRecord) string comparator utils.Comparator @@ -86,9 +87,9 @@ type topNAggregatorGroup struct { type topNAggregator struct { *topNAggregatorGroup - treeMap *treemap.Map - currentTopNum int - dirty bool + treeMap *treemap.Map + dict map[uint64]int64 + dirty bool } // TopNOption is the option to set up a top-n aggregator group. @@ -101,6 +102,13 @@ func WithSortKeyExtractor(sortKeyExtractor func(flow.StreamRecord) int64) TopNOp } } +// WithKeyExtractor sets a closure to extract the key. +func WithKeyExtractor(keyExtractor func(flow.StreamRecord) uint64) TopNOption { + return func(aggregator *topNAggregatorGroup) { + aggregator.keyExtractor = keyExtractor + } +} + // WithGroupKeyExtractor extract group key from the StreamRecord. func WithGroupKeyExtractor(groupKeyExtractor func(flow.StreamRecord) string) TopNOption { return func(aggregator *topNAggregatorGroup) { @@ -117,14 +125,16 @@ func OrderBy(sort TopNSort) TopNOption { func (t *topNAggregatorGroup) Add(input []flow.StreamRecord) { for _, item := range input { + key := t.keyExtractor(item) sortKey := t.sortKeyExtractor(item) groupKey := t.groupKeyExtractor(item) aggregator := t.getOrCreateGroup(groupKey) + aggregator.removeExistedItem(key) if aggregator.checkSortKeyInBufferRange(sortKey) { if e := t.l.Debug(); e.Enabled() { - e.Str("group", groupKey).Time("elem_ts", time.Unix(0, item.TimestampMillis()*int64(time.Millisecond))).Msg("put into topN buffer") + e.Str("group", groupKey).Uint64("key", key).Time("elem_ts", time.Unix(0, item.TimestampMillis()*int64(time.Millisecond))).Msg("put into topN buffer") } - aggregator.put(sortKey, item) + aggregator.put(key, sortKey, item) aggregator.doCleanUp() } } @@ -138,7 +148,7 @@ func (t *topNAggregatorGroup) Snapshot() interface{} { } aggregator.dirty = false iter := aggregator.treeMap.Iterator() - items := make([]*Tuple2, 0, aggregator.currentTopNum) + items := make([]*Tuple2, 0, aggregator.size()) for iter.Next() { list := iter.Value().([]interface{}) for _, item := range list { @@ -180,28 +190,27 @@ func (t *topNAggregatorGroup) getOrCreateGroup(group string) *topNAggregator { t.aggregatorGroup[group] = &topNAggregator{ topNAggregatorGroup: t, treeMap: treemap.NewWith(t.comparator), + dict: make(map[uint64]int64), } return t.aggregatorGroup[group] } func (t *topNAggregator) doCleanUp() { // do cleanup: maintain the treeMap windowSize - if t.currentTopNum > t.cacheSize { + if t.size() > t.cacheSize { lastKey, lastValues := t.treeMap.Max() - size := len(lastValues.([]interface{})) + l := lastValues.([]interface{}) + delete(t.dict, t.keyExtractor(l[len(l)-1].(flow.StreamRecord))) // remove last one - if size <= 1 { - t.currentTopNum -= size + if len(l) <= 1 { t.treeMap.Remove(lastKey) } else { - t.currentTopNum-- - t.treeMap.Put(lastKey, lastValues.([]interface{})[0:size-1]) + t.treeMap.Put(lastKey, l[:len(l)-1]) } } } -func (t *topNAggregator) put(sortKey int64, data flow.StreamRecord) { - t.currentTopNum++ +func (t *topNAggregator) put(key uint64, sortKey int64, data flow.StreamRecord) { t.dirty = true if existingList, ok := t.treeMap.Get(sortKey); ok { existingList = append(existingList.([]interface{}), data) @@ -209,6 +218,7 @@ func (t *topNAggregator) put(sortKey int64, data flow.StreamRecord) { } else { t.treeMap.Put(sortKey, []interface{}{data}) } + t.dict[key] = sortKey } func (t *topNAggregator) checkSortKeyInBufferRange(sortKey int64) bool { @@ -223,7 +233,50 @@ func (t *topNAggregator) checkSortKeyInBufferRange(sortKey int64) bool { if t.comparator(sortKey, worstKey.(int64)) < 0 { return true } - return t.currentTopNum < t.cacheSize + return t.size() < t.cacheSize +} + +func (t *topNAggregator) removeExistedItem(key uint64) { + existed, ok := t.dict[key] + if !ok { + return + } + delete(t.dict, key) + list, ok := t.treeMap.Get(existed) + if !ok { + return + } + l := list.([]interface{}) + for i := 0; i < len(l); i++ { + if t.keyExtractor(l[i].(flow.StreamRecord)) == key { + l = append(l[:i], l[i+1:]...) + } + } + if len(l) == 0 { + t.treeMap.Remove(existed) + return + } + t.treeMap.Put(existed, l) +} + +func (t *topNAggregator) size() int { + return len(t.dict) +} + +func (t *topNAggregatorGroup) leakCheck() { + for g, agg := range t.aggregatorGroup { + if agg.size() > t.cacheSize { + panic(g + "leak detected: topN buffer size exceed the cache size") + } + iter := agg.treeMap.Iterator() + count := 0 + for iter.Next() { + count += len(iter.Value().([]interface{})) + } + if count != agg.size() { + panic(g + "leak detected: treeMap size not match dictionary size") + } + } } // Tuple2 is a tuple with 2 fields. Each field may be a separate type. diff --git a/pkg/flow/streaming/topn_test.go b/pkg/flow/streaming/topn_test.go index 167818382..4d04c5bd6 100644 --- a/pkg/flow/streaming/topn_test.go +++ b/pkg/flow/streaming/topn_test.go @@ -28,106 +28,261 @@ import ( "github.com/apache/skywalking-banyandb/pkg/logger" ) +type testCase struct { + expected map[string][]*Tuple2 + name string + sort TopNSort +} + func TestFlow_TopN_Aggregator(t *testing.T) { - input := []flow.StreamRecord{ - // 1. group by values - // 2. number - // 3. slices of groupBy values - flow.NewStreamRecordWithoutTS(flow.Data{"e2e-service-provider", 10000, []interface{}{"e2e-service-provider"}}), - flow.NewStreamRecordWithoutTS(flow.Data{"e2e-service-consumer", 9900, []interface{}{"e2e-service-consumer"}}), - flow.NewStreamRecordWithoutTS(flow.Data{"e2e-service-provider", 9800, []interface{}{"e2e-service-provider"}}), - flow.NewStreamRecordWithoutTS(flow.Data{"e2e-service-consumer", 9700, []interface{}{"e2e-service-consumer"}}), - flow.NewStreamRecordWithoutTS(flow.Data{"e2e-service-provider", 9700, []interface{}{"e2e-service-provider"}}), - flow.NewStreamRecordWithoutTS(flow.Data{"e2e-service-consumer", 9600, []interface{}{"e2e-service-consumer"}}), - flow.NewStreamRecordWithoutTS(flow.Data{"e2e-service-consumer", 9800, []interface{}{"e2e-service-consumer"}}), - flow.NewStreamRecordWithoutTS(flow.Data{"e2e-service-consumer", 9500, []interface{}{"e2e-service-consumer"}}), + verifyFn := func(t *testing.T, input []flow.StreamRecord, tests []testCase) { + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + require := require.New(t) + var comparator utils.Comparator + if tt.sort == DESC { + comparator = func(a, b interface{}) int { + return utils.Int64Comparator(b, a) + } + } else { + comparator = utils.Int64Comparator + } + topN := &topNAggregatorGroup{ + cacheSize: 3, + sort: tt.sort, + comparator: comparator, + aggregatorGroup: make(map[string]*topNAggregator), + keyExtractor: func(record flow.StreamRecord) uint64 { + return uint64(record.Data().(flow.Data)[0].(int)) + }, + sortKeyExtractor: func(record flow.StreamRecord) int64 { + return int64(record.Data().(flow.Data)[2].(int)) + }, + groupKeyExtractor: func(record flow.StreamRecord) string { + return record.Data().(flow.Data)[1].(string) + }, + l: logger.GetLogger("test"), + } + topN.Add(input) + topN.leakCheck() + snapshot := topN.Snapshot() + require.Len(snapshot, 2) + require.Contains(snapshot, "e2e-service-provider") // provider group + require.Contains(snapshot, "e2e-service-consumer") // consumer group + if diff := cmp.Diff(tt.expected, snapshot); diff != "" { + t.Errorf("Snapshot() mismatch (-want +got):\n%s", diff) + } + }) + } } - tests := []struct { - expected map[string][]*Tuple2 - name string - sort TopNSort - }{ - { - name: "DESC", - sort: DESC, - expected: map[string][]*Tuple2{ - "e2e-service-provider": { - {int64(10000), flow.NewStreamRecordWithoutTS(flow.Data{"e2e-service-provider", 10000, []interface{}{"e2e-service-provider"}})}, - {int64(9800), flow.NewStreamRecordWithoutTS(flow.Data{"e2e-service-provider", 9800, []interface{}{"e2e-service-provider"}})}, - {int64(9700), flow.NewStreamRecordWithoutTS(flow.Data{"e2e-service-provider", 9700, []interface{}{"e2e-service-provider"}})}, - }, - "e2e-service-consumer": { - {int64(9900), flow.NewStreamRecordWithoutTS(flow.Data{"e2e-service-consumer", 9900, []interface{}{"e2e-service-consumer"}})}, - {int64(9800), flow.NewStreamRecordWithoutTS(flow.Data{"e2e-service-consumer", 9800, []interface{}{"e2e-service-consumer"}})}, - {int64(9700), flow.NewStreamRecordWithoutTS(flow.Data{"e2e-service-consumer", 9700, []interface{}{"e2e-service-consumer"}})}, - }, + + t.Run("normal", func(t *testing.T) { + verifyFn(t, + []flow.StreamRecord{ + // 1. series id + // 2. group by values + // 3. number + // 4. slices of groupBy values + flow.NewStreamRecordWithoutTS(flow.Data{1, "e2e-service-provider", 10000, []interface{}{"e2e-service-provider"}}), + flow.NewStreamRecordWithoutTS(flow.Data{2, "e2e-service-consumer", 9900, []interface{}{"e2e-service-consumer"}}), + flow.NewStreamRecordWithoutTS(flow.Data{3, "e2e-service-provider", 9800, []interface{}{"e2e-service-provider"}}), + flow.NewStreamRecordWithoutTS(flow.Data{4, "e2e-service-consumer", 9700, []interface{}{"e2e-service-consumer"}}), + flow.NewStreamRecordWithoutTS(flow.Data{5, "e2e-service-provider", 9700, []interface{}{"e2e-service-provider"}}), + flow.NewStreamRecordWithoutTS(flow.Data{6, "e2e-service-consumer", 9600, []interface{}{"e2e-service-consumer"}}), + flow.NewStreamRecordWithoutTS(flow.Data{7, "e2e-service-consumer", 9800, []interface{}{"e2e-service-consumer"}}), + flow.NewStreamRecordWithoutTS(flow.Data{8, "e2e-service-consumer", 9500, []interface{}{"e2e-service-consumer"}}), }, - }, - { - name: "DESC by default", - sort: 0, - expected: map[string][]*Tuple2{ - "e2e-service-provider": { - {int64(10000), flow.NewStreamRecordWithoutTS(flow.Data{"e2e-service-provider", 10000, []interface{}{"e2e-service-provider"}})}, - {int64(9800), flow.NewStreamRecordWithoutTS(flow.Data{"e2e-service-provider", 9800, []interface{}{"e2e-service-provider"}})}, - {int64(9700), flow.NewStreamRecordWithoutTS(flow.Data{"e2e-service-provider", 9700, []interface{}{"e2e-service-provider"}})}, + []testCase{ + { + name: "DESC", + sort: DESC, + expected: map[string][]*Tuple2{ + "e2e-service-provider": { + {int64(10000), flow.NewStreamRecordWithoutTS(flow.Data{1, "e2e-service-provider", 10000, []interface{}{"e2e-service-provider"}})}, + {int64(9800), flow.NewStreamRecordWithoutTS(flow.Data{3, "e2e-service-provider", 9800, []interface{}{"e2e-service-provider"}})}, + {int64(9700), flow.NewStreamRecordWithoutTS(flow.Data{5, "e2e-service-provider", 9700, []interface{}{"e2e-service-provider"}})}, + }, + "e2e-service-consumer": { + {int64(9900), flow.NewStreamRecordWithoutTS(flow.Data{2, "e2e-service-consumer", 9900, []interface{}{"e2e-service-consumer"}})}, + {int64(9800), flow.NewStreamRecordWithoutTS(flow.Data{7, "e2e-service-consumer", 9800, []interface{}{"e2e-service-consumer"}})}, + {int64(9700), flow.NewStreamRecordWithoutTS(flow.Data{4, "e2e-service-consumer", 9700, []interface{}{"e2e-service-consumer"}})}, + }, + }, + }, + { + name: "DESC by default", + sort: 0, + expected: map[string][]*Tuple2{ + "e2e-service-provider": { + {int64(10000), flow.NewStreamRecordWithoutTS(flow.Data{1, "e2e-service-provider", 10000, []interface{}{"e2e-service-provider"}})}, + {int64(9800), flow.NewStreamRecordWithoutTS(flow.Data{3, "e2e-service-provider", 9800, []interface{}{"e2e-service-provider"}})}, + {int64(9700), flow.NewStreamRecordWithoutTS(flow.Data{5, "e2e-service-provider", 9700, []interface{}{"e2e-service-provider"}})}, + }, + "e2e-service-consumer": { + {int64(9900), flow.NewStreamRecordWithoutTS(flow.Data{2, "e2e-service-consumer", 9900, []interface{}{"e2e-service-consumer"}})}, + {int64(9800), flow.NewStreamRecordWithoutTS(flow.Data{7, "e2e-service-consumer", 9800, []interface{}{"e2e-service-consumer"}})}, + {int64(9700), flow.NewStreamRecordWithoutTS(flow.Data{4, "e2e-service-consumer", 9700, []interface{}{"e2e-service-consumer"}})}, + }, + }, }, - "e2e-service-consumer": { - {int64(9900), flow.NewStreamRecordWithoutTS(flow.Data{"e2e-service-consumer", 9900, []interface{}{"e2e-service-consumer"}})}, - {int64(9800), flow.NewStreamRecordWithoutTS(flow.Data{"e2e-service-consumer", 9800, []interface{}{"e2e-service-consumer"}})}, - {int64(9700), flow.NewStreamRecordWithoutTS(flow.Data{"e2e-service-consumer", 9700, []interface{}{"e2e-service-consumer"}})}, + { + name: "ASC", + sort: ASC, + expected: map[string][]*Tuple2{ + "e2e-service-consumer": { + {int64(9500), flow.NewStreamRecordWithoutTS(flow.Data{8, "e2e-service-consumer", 9500, []interface{}{"e2e-service-consumer"}})}, + {int64(9600), flow.NewStreamRecordWithoutTS(flow.Data{6, "e2e-service-consumer", 9600, []interface{}{"e2e-service-consumer"}})}, + {int64(9700), flow.NewStreamRecordWithoutTS(flow.Data{4, "e2e-service-consumer", 9700, []interface{}{"e2e-service-consumer"}})}, + }, + "e2e-service-provider": { + {int64(9700), flow.NewStreamRecordWithoutTS(flow.Data{5, "e2e-service-provider", 9700, []interface{}{"e2e-service-provider"}})}, + {int64(9800), flow.NewStreamRecordWithoutTS(flow.Data{3, "e2e-service-provider", 9800, []interface{}{"e2e-service-provider"}})}, + {int64(10000), flow.NewStreamRecordWithoutTS(flow.Data{1, "e2e-service-provider", 10000, []interface{}{"e2e-service-provider"}})}, + }, + }, }, }, - }, - { - name: "ASC", - sort: ASC, - expected: map[string][]*Tuple2{ - "e2e-service-consumer": { - {int64(9500), flow.NewStreamRecordWithoutTS(flow.Data{"e2e-service-consumer", 9500, []interface{}{"e2e-service-consumer"}})}, - {int64(9600), flow.NewStreamRecordWithoutTS(flow.Data{"e2e-service-consumer", 9600, []interface{}{"e2e-service-consumer"}})}, - {int64(9700), flow.NewStreamRecordWithoutTS(flow.Data{"e2e-service-consumer", 9700, []interface{}{"e2e-service-consumer"}})}, + ) + }) + t.Run("duplicated with different sort key", func(t *testing.T) { + verifyFn(t, + []flow.StreamRecord{ + // 1. series id + // 2. group by values + // 3. number + // 4. slices of groupBy values + flow.NewStreamRecordWithoutTS(flow.Data{1, "e2e-service-provider", 10000, []interface{}{"e2e-service-provider"}}), + flow.NewStreamRecordWithoutTS(flow.Data{2, "e2e-service-consumer", 9900, []interface{}{"e2e-service-consumer"}}), + flow.NewStreamRecordWithoutTS(flow.Data{3, "e2e-service-provider", 9800, []interface{}{"e2e-service-provider"}}), + flow.NewStreamRecordWithoutTS(flow.Data{4, "e2e-service-consumer", 9700, []interface{}{"e2e-service-consumer"}}), + flow.NewStreamRecordWithoutTS(flow.Data{5, "e2e-service-provider", 9700, []interface{}{"e2e-service-provider"}}), + flow.NewStreamRecordWithoutTS(flow.Data{6, "e2e-service-consumer", 9600, []interface{}{"e2e-service-consumer"}}), + flow.NewStreamRecordWithoutTS(flow.Data{7, "e2e-service-consumer", 9800, []interface{}{"e2e-service-consumer"}}), + flow.NewStreamRecordWithoutTS(flow.Data{8, "e2e-service-consumer", 9500, []interface{}{"e2e-service-consumer"}}), + flow.NewStreamRecordWithoutTS(flow.Data{3, "e2e-service-provider", 9801, []interface{}{"e2e-service-provider"}}), + flow.NewStreamRecordWithoutTS(flow.Data{4, "e2e-service-consumer", 9701, []interface{}{"e2e-service-consumer"}}), + }, + []testCase{ + { + name: "DESC", + sort: DESC, + expected: map[string][]*Tuple2{ + "e2e-service-provider": { + {int64(10000), flow.NewStreamRecordWithoutTS(flow.Data{1, "e2e-service-provider", 10000, []interface{}{"e2e-service-provider"}})}, + {int64(9801), flow.NewStreamRecordWithoutTS(flow.Data{3, "e2e-service-provider", 9801, []interface{}{"e2e-service-provider"}})}, + {int64(9700), flow.NewStreamRecordWithoutTS(flow.Data{5, "e2e-service-provider", 9700, []interface{}{"e2e-service-provider"}})}, + }, + "e2e-service-consumer": { + {int64(9900), flow.NewStreamRecordWithoutTS(flow.Data{2, "e2e-service-consumer", 9900, []interface{}{"e2e-service-consumer"}})}, + {int64(9800), flow.NewStreamRecordWithoutTS(flow.Data{7, "e2e-service-consumer", 9800, []interface{}{"e2e-service-consumer"}})}, + {int64(9701), flow.NewStreamRecordWithoutTS(flow.Data{4, "e2e-service-consumer", 9701, []interface{}{"e2e-service-consumer"}})}, + }, + }, + }, + { + name: "DESC by default", + sort: 0, + expected: map[string][]*Tuple2{ + "e2e-service-provider": { + {int64(10000), flow.NewStreamRecordWithoutTS(flow.Data{1, "e2e-service-provider", 10000, []interface{}{"e2e-service-provider"}})}, + {int64(9801), flow.NewStreamRecordWithoutTS(flow.Data{3, "e2e-service-provider", 9801, []interface{}{"e2e-service-provider"}})}, + {int64(9700), flow.NewStreamRecordWithoutTS(flow.Data{5, "e2e-service-provider", 9700, []interface{}{"e2e-service-provider"}})}, + }, + "e2e-service-consumer": { + {int64(9900), flow.NewStreamRecordWithoutTS(flow.Data{2, "e2e-service-consumer", 9900, []interface{}{"e2e-service-consumer"}})}, + {int64(9800), flow.NewStreamRecordWithoutTS(flow.Data{7, "e2e-service-consumer", 9800, []interface{}{"e2e-service-consumer"}})}, + {int64(9701), flow.NewStreamRecordWithoutTS(flow.Data{4, "e2e-service-consumer", 9701, []interface{}{"e2e-service-consumer"}})}, + }, + }, }, - "e2e-service-provider": { - {int64(9700), flow.NewStreamRecordWithoutTS(flow.Data{"e2e-service-provider", 9700, []interface{}{"e2e-service-provider"}})}, - {int64(9800), flow.NewStreamRecordWithoutTS(flow.Data{"e2e-service-provider", 9800, []interface{}{"e2e-service-provider"}})}, - {int64(10000), flow.NewStreamRecordWithoutTS(flow.Data{"e2e-service-provider", 10000, []interface{}{"e2e-service-provider"}})}, + { + name: "ASC", + sort: ASC, + expected: map[string][]*Tuple2{ + "e2e-service-consumer": { + {int64(9500), flow.NewStreamRecordWithoutTS(flow.Data{8, "e2e-service-consumer", 9500, []interface{}{"e2e-service-consumer"}})}, + {int64(9600), flow.NewStreamRecordWithoutTS(flow.Data{6, "e2e-service-consumer", 9600, []interface{}{"e2e-service-consumer"}})}, + {int64(9701), flow.NewStreamRecordWithoutTS(flow.Data{4, "e2e-service-consumer", 9701, []interface{}{"e2e-service-consumer"}})}, + }, + "e2e-service-provider": { + {int64(9700), flow.NewStreamRecordWithoutTS(flow.Data{5, "e2e-service-provider", 9700, []interface{}{"e2e-service-provider"}})}, + {int64(9801), flow.NewStreamRecordWithoutTS(flow.Data{3, "e2e-service-provider", 9801, []interface{}{"e2e-service-provider"}})}, + {int64(10000), flow.NewStreamRecordWithoutTS(flow.Data{1, "e2e-service-provider", 10000, []interface{}{"e2e-service-provider"}})}, + }, + }, }, }, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - require := require.New(t) - var comparator utils.Comparator - if tt.sort == DESC { - comparator = func(a, b interface{}) int { - return utils.Int64Comparator(b, a) - } - } else { - comparator = utils.Int64Comparator - } - topN := &topNAggregatorGroup{ - cacheSize: 3, - sort: tt.sort, - comparator: comparator, - aggregatorGroup: make(map[string]*topNAggregator), - sortKeyExtractor: func(record flow.StreamRecord) int64 { - return int64(record.Data().(flow.Data)[1].(int)) + ) + }) + + t.Run("duplicated with identical sort key", func(t *testing.T) { + verifyFn(t, + []flow.StreamRecord{ + // 1. series id + // 2. group by values + // 3. number + // 4. slices of groupBy values + flow.NewStreamRecordWithoutTS(flow.Data{1, "e2e-service-provider", 10000, []interface{}{"e2e-service-provider"}}), + flow.NewStreamRecordWithoutTS(flow.Data{2, "e2e-service-consumer", 9900, []interface{}{"e2e-service-consumer"}}), + flow.NewStreamRecordWithoutTS(flow.Data{3, "e2e-service-provider", 9800, []interface{}{"e2e-service-provider"}}), + flow.NewStreamRecordWithoutTS(flow.Data{4, "e2e-service-consumer", 9700, []interface{}{"e2e-service-consumer"}}), + flow.NewStreamRecordWithoutTS(flow.Data{5, "e2e-service-provider", 9800, []interface{}{"e2e-service-provider"}}), + flow.NewStreamRecordWithoutTS(flow.Data{6, "e2e-service-consumer", 9700, []interface{}{"e2e-service-consumer"}}), + flow.NewStreamRecordWithoutTS(flow.Data{7, "e2e-service-consumer", 9800, []interface{}{"e2e-service-consumer"}}), + flow.NewStreamRecordWithoutTS(flow.Data{8, "e2e-service-consumer", 9500, []interface{}{"e2e-service-consumer"}}), + flow.NewStreamRecordWithoutTS(flow.Data{3, "e2e-service-provider", 9801, []interface{}{"e2e-service-provider"}}), + flow.NewStreamRecordWithoutTS(flow.Data{4, "e2e-service-consumer", 9701, []interface{}{"e2e-service-consumer"}}), + }, + []testCase{ + { + name: "DESC", + sort: DESC, + expected: map[string][]*Tuple2{ + "e2e-service-provider": { + {int64(10000), flow.NewStreamRecordWithoutTS(flow.Data{1, "e2e-service-provider", 10000, []interface{}{"e2e-service-provider"}})}, + {int64(9801), flow.NewStreamRecordWithoutTS(flow.Data{3, "e2e-service-provider", 9801, []interface{}{"e2e-service-provider"}})}, + {int64(9800), flow.NewStreamRecordWithoutTS(flow.Data{5, "e2e-service-provider", 9800, []interface{}{"e2e-service-provider"}})}, + }, + "e2e-service-consumer": { + {int64(9900), flow.NewStreamRecordWithoutTS(flow.Data{2, "e2e-service-consumer", 9900, []interface{}{"e2e-service-consumer"}})}, + {int64(9800), flow.NewStreamRecordWithoutTS(flow.Data{7, "e2e-service-consumer", 9800, []interface{}{"e2e-service-consumer"}})}, + {int64(9701), flow.NewStreamRecordWithoutTS(flow.Data{4, "e2e-service-consumer", 9701, []interface{}{"e2e-service-consumer"}})}, + }, + }, }, - groupKeyExtractor: func(record flow.StreamRecord) string { - return record.Data().(flow.Data)[0].(string) + { + name: "DESC by default", + sort: 0, + expected: map[string][]*Tuple2{ + "e2e-service-provider": { + {int64(10000), flow.NewStreamRecordWithoutTS(flow.Data{1, "e2e-service-provider", 10000, []interface{}{"e2e-service-provider"}})}, + {int64(9801), flow.NewStreamRecordWithoutTS(flow.Data{3, "e2e-service-provider", 9801, []interface{}{"e2e-service-provider"}})}, + {int64(9800), flow.NewStreamRecordWithoutTS(flow.Data{5, "e2e-service-provider", 9800, []interface{}{"e2e-service-provider"}})}, + }, + "e2e-service-consumer": { + {int64(9900), flow.NewStreamRecordWithoutTS(flow.Data{2, "e2e-service-consumer", 9900, []interface{}{"e2e-service-consumer"}})}, + {int64(9800), flow.NewStreamRecordWithoutTS(flow.Data{7, "e2e-service-consumer", 9800, []interface{}{"e2e-service-consumer"}})}, + {int64(9701), flow.NewStreamRecordWithoutTS(flow.Data{4, "e2e-service-consumer", 9701, []interface{}{"e2e-service-consumer"}})}, + }, + }, }, - l: logger.GetLogger("test"), - } - topN.Add(input) - snapshot := topN.Snapshot() - require.Len(snapshot, 2) - require.Contains(snapshot, "e2e-service-provider") // provider group - require.Contains(snapshot, "e2e-service-consumer") // consumer group - if diff := cmp.Diff(tt.expected, snapshot); diff != "" { - t.Errorf("Snapshot() mismatch (-want +got):\n%s", diff) - } - }) - } + { + name: "ASC", + sort: ASC, + expected: map[string][]*Tuple2{ + "e2e-service-consumer": { + {int64(9500), flow.NewStreamRecordWithoutTS(flow.Data{8, "e2e-service-consumer", 9500, []interface{}{"e2e-service-consumer"}})}, + {int64(9700), flow.NewStreamRecordWithoutTS(flow.Data{6, "e2e-service-consumer", 9700, []interface{}{"e2e-service-consumer"}})}, + {int64(9701), flow.NewStreamRecordWithoutTS(flow.Data{4, "e2e-service-consumer", 9701, []interface{}{"e2e-service-consumer"}})}, + }, + "e2e-service-provider": { + {int64(9800), flow.NewStreamRecordWithoutTS(flow.Data{5, "e2e-service-provider", 9800, []interface{}{"e2e-service-provider"}})}, + {int64(9801), flow.NewStreamRecordWithoutTS(flow.Data{3, "e2e-service-provider", 9801, []interface{}{"e2e-service-provider"}})}, + {int64(10000), flow.NewStreamRecordWithoutTS(flow.Data{1, "e2e-service-provider", 10000, []interface{}{"e2e-service-provider"}})}, + }, + }, + }, + }, + ) + }) }