Skip to content

Commit

Permalink
Fix the bug that TopN processing item leak
Browse files Browse the repository at this point in the history
Signed-off-by: Gao Hongtao <hanahmily@gmail.com>
  • Loading branch information
hanahmily committed Oct 22, 2024
1 parent c64724a commit 267e05b
Show file tree
Hide file tree
Showing 6 changed files with 353 additions and 128 deletions.
4 changes: 4 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ Release Notes.

## 0.8.0

### Bug Fixes

- Fix the bug that TopN processing item leak. The item can not be updated but as a new item.

### Documentation

- Improve the description of the memory in observability doc.
Expand Down
9 changes: 8 additions & 1 deletion banyand/measure/topn.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ var (
type dataPointWithEntityValues struct {
*measurev1.DataPointValue
entityValues []*modelv1.TagValue
seriesID uint64
}

type topNStreamingProcessor struct {
Expand Down Expand Up @@ -272,6 +273,9 @@ func (t *topNStreamingProcessor) start() *topNStreamingProcessor {
t.errCh = t.streamingFlow.Window(streaming.NewTumblingTimeWindows(t.interval, flushInterval)).
AllowedMaxWindows(int(t.topNSchema.GetLruSize())).
TopN(int(t.topNSchema.GetCountersNumber()),
streaming.WithKeyExtractor(func(record flow.StreamRecord) uint64 {
return record.Data().(flow.Data)[4].(uint64)
}),
streaming.WithSortKeyExtractor(func(record flow.StreamRecord) int64 {
return record.Data().(flow.Data)[2].(int64)
}),
Expand Down Expand Up @@ -322,7 +326,7 @@ func (manager *topNProcessorManager) Close() error {
return err
}

func (manager *topNProcessorManager) onMeasureWrite(request *measurev1.InternalWriteRequest) {
func (manager *topNProcessorManager) onMeasureWrite(seriesID uint64, request *measurev1.InternalWriteRequest) {
go func() {
manager.RLock()
defer manager.RUnlock()
Expand All @@ -331,6 +335,7 @@ func (manager *topNProcessorManager) onMeasureWrite(request *measurev1.InternalW
processor.src <- flow.NewStreamRecordWithTimestampPb(&dataPointWithEntityValues{
request.GetRequest().GetDataPoint(),
request.GetEntityValues(),
seriesID,
}, request.GetRequest().GetDataPoint().GetTimestamp())
}
}
Expand Down Expand Up @@ -436,6 +441,7 @@ func (manager *topNProcessorManager) buildMapper(fieldName string, groupByNames
dpWithEvs.GetFields()[fieldIdx].GetInt().GetValue(),
// groupBy tag values as v3
nil,
dpWithEvs.seriesID,
}
}, nil
}
Expand All @@ -458,6 +464,7 @@ func (manager *topNProcessorManager) buildMapper(fieldName string, groupByNames
transform(groupLocator, func(locator partition.TagLocator) *modelv1.TagValue {
return extractTagValue(dpWithEvs.DataPointValue, locator)
}),
dpWithEvs.seriesID,
}
}, nil
}
Expand Down
2 changes: 1 addition & 1 deletion banyand/measure/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ func (w *writeCallback) handle(dst map[string]*dataPointsInGroup, writeEvent *me
dpt.dataPoints.tagFamilies = append(dpt.dataPoints.tagFamilies, tagFamilies)

if stm.processorManager != nil {
stm.processorManager.onMeasureWrite(&measurev1.InternalWriteRequest{
stm.processorManager.onMeasureWrite(uint64(series.ID), &measurev1.InternalWriteRequest{
Request: &measurev1.WriteRequest{
Metadata: stm.GetSchema().Metadata,
DataPoint: req.DataPoint,
Expand Down
46 changes: 26 additions & 20 deletions pkg/flow/streaming/streaming_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
g "github.com/onsi/ginkgo/v2"
"github.com/onsi/gomega"

"github.com/apache/skywalking-banyandb/pkg/convert"
"github.com/apache/skywalking-banyandb/pkg/flow"
"github.com/apache/skywalking-banyandb/pkg/test/flags"
flowTest "github.com/apache/skywalking-banyandb/pkg/test/flow"
Expand Down Expand Up @@ -146,14 +147,17 @@ var _ = g.Describe("Streaming", func() {
f = New("test", flowTest.NewSlice(input)).
Map(flow.UnaryFunc[any](func(_ context.Context, item interface{}) interface{} {
// groupBy
return flow.Data{item.(*record).service, int64(item.(*record).value)}
return flow.Data{item.(*record).service, int64(item.(*record).value), item.(*record).service + item.(*record).instance}
})).
Window(NewTumblingTimeWindows(15*time.Second, 15*time.Second)).
TopN(3, WithSortKeyExtractor(func(record flow.StreamRecord) int64 {
return record.Data().(flow.Data)[1].(int64)
}), OrderBy(ASC), WithGroupKeyExtractor(func(record flow.StreamRecord) string {
return record.Data().(flow.Data)[0].(string)
})).
TopN(3, WithKeyExtractor(func(record flow.StreamRecord) uint64 {
return convert.HashStr(record.Data().(flow.Data)[2].(string))
}),
WithSortKeyExtractor(func(record flow.StreamRecord) int64 {
return record.Data().(flow.Data)[1].(int64)
}), OrderBy(ASC), WithGroupKeyExtractor(func(record flow.StreamRecord) string {
return record.Data().(flow.Data)[0].(string)
})).
To(snk)

errCh = f.Open()
Expand All @@ -179,15 +183,15 @@ var _ = g.Describe("Streaming", func() {
g.Expect(len(snk.Value())).Should(gomega.BeNumerically(">=", 1))
// e2e-service-consumer Group
g.Expect(snk.Value()[0].(flow.StreamRecord).Data().(map[string][]*Tuple2)["e2e-service-consumer"]).Should(gomega.BeEquivalentTo([]*Tuple2{
{int64(9500), flow.NewStreamRecord(flow.Data{"e2e-service-consumer", int64(9500)}, 7000)},
{int64(9600), flow.NewStreamRecord(flow.Data{"e2e-service-consumer", int64(9600)}, 6000)},
{int64(9700), flow.NewStreamRecord(flow.Data{"e2e-service-consumer", int64(9700)}, 4000)},
{int64(9500), flow.NewStreamRecord(flow.Data{"e2e-service-consumer", int64(9500), "e2e-service-consumerinstance-001"}, 7000)},
{int64(9600), flow.NewStreamRecord(flow.Data{"e2e-service-consumer", int64(9600), "e2e-service-consumerinstance-004"}, 6000)},
{int64(9700), flow.NewStreamRecord(flow.Data{"e2e-service-consumer", int64(9700), "e2e-service-consumerinstance-002"}, 4000)},
}))
// e2e-service-provider Group
g.Expect(snk.Value()[0].(flow.StreamRecord).Data().(map[string][]*Tuple2)["e2e-service-provider"]).Should(gomega.BeEquivalentTo([]*Tuple2{
{int64(9700), flow.NewStreamRecord(flow.Data{"e2e-service-provider", int64(9700)}, 5000)},
{int64(9800), flow.NewStreamRecord(flow.Data{"e2e-service-provider", int64(9800)}, 3000)},
{int64(10000), flow.NewStreamRecord(flow.Data{"e2e-service-provider", int64(10000)}, 1000)},
{int64(9700), flow.NewStreamRecord(flow.Data{"e2e-service-provider", int64(9700), "e2e-service-providerinstance-003"}, 5000)},
{int64(9800), flow.NewStreamRecord(flow.Data{"e2e-service-provider", int64(9800), "e2e-service-providerinstance-002"}, 3000)},
{int64(10000), flow.NewStreamRecord(flow.Data{"e2e-service-provider", int64(10000), "e2e-service-providerinstance-001"}, 1000)},
}))
}).WithTimeout(flags.EventuallyTimeout).Should(gomega.Succeed())
})
Expand All @@ -209,10 +213,12 @@ var _ = g.Describe("Streaming", func() {
f = New("test", flowTest.NewSlice(input)).
Map(flow.UnaryFunc[any](func(_ context.Context, item interface{}) interface{} {
// groupBy
return flow.Data{item.(*record).service, int64(item.(*record).value)}
return flow.Data{item.(*record).service, int64(item.(*record).value), item.(*record).service + item.(*record).instance}
})).
Window(NewTumblingTimeWindows(15*time.Second, 15*time.Second)).
TopN(3, WithSortKeyExtractor(func(record flow.StreamRecord) int64 {
TopN(3, WithKeyExtractor(func(record flow.StreamRecord) uint64 {
return convert.HashStr(record.Data().(flow.Data)[2].(string))
}), WithSortKeyExtractor(func(record flow.StreamRecord) int64 {
return record.Data().(flow.Data)[1].(int64)
}), WithGroupKeyExtractor(func(record flow.StreamRecord) string {
return record.Data().(flow.Data)[0].(string)
Expand Down Expand Up @@ -242,15 +248,15 @@ var _ = g.Describe("Streaming", func() {
g.Expect(len(snk.Value())).Should(gomega.BeNumerically(">=", 1))
// e2e-service-consumer Group
g.Expect(snk.Value()[0].(flow.StreamRecord).Data().(map[string][]*Tuple2)["e2e-service-consumer"]).Should(gomega.BeEquivalentTo([]*Tuple2{
{int64(9900), flow.NewStreamRecord(flow.Data{"e2e-service-consumer", int64(9900)}, 2000)},
{int64(9700), flow.NewStreamRecord(flow.Data{"e2e-service-consumer", int64(9700)}, 4000)},
{int64(9600), flow.NewStreamRecord(flow.Data{"e2e-service-consumer", int64(9600)}, 6000)},
{int64(9700), flow.NewStreamRecord(flow.Data{"e2e-service-consumer", int64(9700), "e2e-service-consumerinstance-002"}, 4000)},
{int64(9600), flow.NewStreamRecord(flow.Data{"e2e-service-consumer", int64(9600), "e2e-service-consumerinstance-004"}, 6000)},
{int64(9500), flow.NewStreamRecord(flow.Data{"e2e-service-consumer", int64(9500), "e2e-service-consumerinstance-001"}, 7000)},
}))
// e2e-service-provider Group
g.Expect(snk.Value()[0].(flow.StreamRecord).Data().(map[string][]*Tuple2)["e2e-service-provider"]).Should(gomega.BeEquivalentTo([]*Tuple2{
{int64(10000), flow.NewStreamRecord(flow.Data{"e2e-service-provider", int64(10000)}, 1000)},
{int64(9800), flow.NewStreamRecord(flow.Data{"e2e-service-provider", int64(9800)}, 3000)},
{int64(9700), flow.NewStreamRecord(flow.Data{"e2e-service-provider", int64(9700)}, 5000)},
{int64(10000), flow.NewStreamRecord(flow.Data{"e2e-service-provider", int64(10000), "e2e-service-providerinstance-001"}, 1000)},
{int64(9800), flow.NewStreamRecord(flow.Data{"e2e-service-provider", int64(9800), "e2e-service-providerinstance-002"}, 3000)},
{int64(9700), flow.NewStreamRecord(flow.Data{"e2e-service-provider", int64(9700), "e2e-service-providerinstance-003"}, 5000)},
}))
}).WithTimeout(flags.EventuallyTimeout).Should(gomega.Succeed())
})
Expand Down
83 changes: 68 additions & 15 deletions pkg/flow/streaming/topn.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ func (s *windowedFlow) TopN(topNum int, opts ...any) flow.Flow {

type topNAggregatorGroup struct {
aggregatorGroup map[string]*topNAggregator
keyExtractor func(flow.StreamRecord) uint64
sortKeyExtractor func(flow.StreamRecord) int64
groupKeyExtractor func(flow.StreamRecord) string
comparator utils.Comparator
Expand All @@ -86,9 +87,9 @@ type topNAggregatorGroup struct {

type topNAggregator struct {
*topNAggregatorGroup
treeMap *treemap.Map
currentTopNum int
dirty bool
treeMap *treemap.Map
dict map[uint64]int64
dirty bool
}

// TopNOption is the option to set up a top-n aggregator group.
Expand All @@ -101,6 +102,13 @@ func WithSortKeyExtractor(sortKeyExtractor func(flow.StreamRecord) int64) TopNOp
}
}

// WithKeyExtractor sets a closure to extract the key.
func WithKeyExtractor(keyExtractor func(flow.StreamRecord) uint64) TopNOption {
return func(aggregator *topNAggregatorGroup) {
aggregator.keyExtractor = keyExtractor
}
}

// WithGroupKeyExtractor extract group key from the StreamRecord.
func WithGroupKeyExtractor(groupKeyExtractor func(flow.StreamRecord) string) TopNOption {
return func(aggregator *topNAggregatorGroup) {
Expand All @@ -117,14 +125,16 @@ func OrderBy(sort TopNSort) TopNOption {

func (t *topNAggregatorGroup) Add(input []flow.StreamRecord) {
for _, item := range input {
key := t.keyExtractor(item)
sortKey := t.sortKeyExtractor(item)
groupKey := t.groupKeyExtractor(item)
aggregator := t.getOrCreateGroup(groupKey)
aggregator.removeExistedItem(key)
if aggregator.checkSortKeyInBufferRange(sortKey) {
if e := t.l.Debug(); e.Enabled() {
e.Str("group", groupKey).Time("elem_ts", time.Unix(0, item.TimestampMillis()*int64(time.Millisecond))).Msg("put into topN buffer")
e.Str("group", groupKey).Uint64("key", key).Time("elem_ts", time.Unix(0, item.TimestampMillis()*int64(time.Millisecond))).Msg("put into topN buffer")
}
aggregator.put(sortKey, item)
aggregator.put(key, sortKey, item)
aggregator.doCleanUp()
}
}
Expand All @@ -138,7 +148,7 @@ func (t *topNAggregatorGroup) Snapshot() interface{} {
}
aggregator.dirty = false
iter := aggregator.treeMap.Iterator()
items := make([]*Tuple2, 0, aggregator.currentTopNum)
items := make([]*Tuple2, 0, aggregator.size())
for iter.Next() {
list := iter.Value().([]interface{})
for _, item := range list {
Expand Down Expand Up @@ -180,35 +190,35 @@ func (t *topNAggregatorGroup) getOrCreateGroup(group string) *topNAggregator {
t.aggregatorGroup[group] = &topNAggregator{
topNAggregatorGroup: t,
treeMap: treemap.NewWith(t.comparator),
dict: make(map[uint64]int64),
}
return t.aggregatorGroup[group]
}

func (t *topNAggregator) doCleanUp() {
// do cleanup: maintain the treeMap windowSize
if t.currentTopNum > t.cacheSize {
if t.size() > t.cacheSize {
lastKey, lastValues := t.treeMap.Max()
size := len(lastValues.([]interface{}))
l := lastValues.([]interface{})
delete(t.dict, t.keyExtractor(l[len(l)-1].(flow.StreamRecord)))
// remove last one
if size <= 1 {
t.currentTopNum -= size
if len(l) <= 1 {
t.treeMap.Remove(lastKey)
} else {
t.currentTopNum--
t.treeMap.Put(lastKey, lastValues.([]interface{})[0:size-1])
t.treeMap.Put(lastKey, l[:len(l)-1])
}
}
}

func (t *topNAggregator) put(sortKey int64, data flow.StreamRecord) {
t.currentTopNum++
func (t *topNAggregator) put(key uint64, sortKey int64, data flow.StreamRecord) {
t.dirty = true
if existingList, ok := t.treeMap.Get(sortKey); ok {
existingList = append(existingList.([]interface{}), data)
t.treeMap.Put(sortKey, existingList)
} else {
t.treeMap.Put(sortKey, []interface{}{data})
}
t.dict[key] = sortKey
}

func (t *topNAggregator) checkSortKeyInBufferRange(sortKey int64) bool {
Expand All @@ -223,7 +233,50 @@ func (t *topNAggregator) checkSortKeyInBufferRange(sortKey int64) bool {
if t.comparator(sortKey, worstKey.(int64)) < 0 {
return true
}
return t.currentTopNum < t.cacheSize
return t.size() < t.cacheSize
}

func (t *topNAggregator) removeExistedItem(key uint64) {
existed, ok := t.dict[key]
if !ok {
return
}
delete(t.dict, key)
list, ok := t.treeMap.Get(existed)
if !ok {
return
}
l := list.([]interface{})
for i := 0; i < len(l); i++ {
if t.keyExtractor(l[i].(flow.StreamRecord)) == key {
l = append(l[:i], l[i+1:]...)
}
}
if len(l) == 0 {
t.treeMap.Remove(existed)
return
}
t.treeMap.Put(existed, l)
}

func (t *topNAggregator) size() int {
return len(t.dict)
}

func (t *topNAggregatorGroup) leakCheck() {
for g, agg := range t.aggregatorGroup {
if agg.size() > t.cacheSize {
panic(g + "leak detected: topN buffer size exceed the cache size")
}
iter := agg.treeMap.Iterator()
count := 0
for iter.Next() {
count += len(iter.Value().([]interface{}))
}
if count != agg.size() {
panic(g + "leak detected: treeMap size not match dictionary size")
}
}
}

// Tuple2 is a tuple with 2 fields. Each field may be a separate type.
Expand Down
Loading

0 comments on commit 267e05b

Please sign in to comment.