From 2bc6b0aa9d6f9b0ee79fb68d0a4be2360cf37c72 Mon Sep 17 00:00:00 2001 From: Gao Hongtao Date: Tue, 5 Nov 2024 22:26:46 +0800 Subject: [PATCH] Several Changes about Index and Document (#554) * Remove sortable field from stored field * Update kubernetes install document * Remove invalid setting file Signed-off-by: Gao Hongtao --- .air.toml | 31 ------ CHANGES.md | 2 + banyand/internal/storage/index.go | 2 +- banyand/measure/write.go | 10 +- docs/installation/kubernetes.md | 8 +- pkg/index/index.go | 4 +- pkg/index/inverted/inverted.go | 104 ++++++++--------- pkg/index/inverted/inverted_series.go | 153 ++++++++++++++++++++++++++ pkg/index/inverted/sort.go | 42 ++++--- pkg/index/testcases/duration.go | 2 +- 10 files changed, 239 insertions(+), 119 deletions(-) delete mode 100644 .air.toml diff --git a/.air.toml b/.air.toml deleted file mode 100644 index 153646870..000000000 --- a/.air.toml +++ /dev/null @@ -1,31 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -# Config file for [Air](https://github.com/cosmtrek/air) in TOML format - -# Working directory -# . or absolute path, please note that the directories following must be under root. -root = "." -tmp_dir = "tmp" - -[build] -# Just plain old shell command. You could use `make` as well. -cmd = "go build -gcflags='all=-N -l' -buildvcs=false -o ./tmp/main ./banyand/cmd/server" -# Binary file yields from `cmd`. -bin = "tmp/main" -# Customize binary. -full_bin = "dlv exec --accept-multiclient --log --headless --continue --listen :2345 --api-version 2 ./tmp/main standalone" -# Watch these filename extensions. -include_ext = ["go"] diff --git a/CHANGES.md b/CHANGES.md index e52ab56f2..3aaf6ffda 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -7,6 +7,7 @@ Release Notes. ### Features - Add the `bydbctl analyze series` command to analyze the series data. +- Index: Remove sortable field from the stored field. If a field is sortable only, it won't be stored. ### Bug Fixes @@ -15,6 +16,7 @@ Release Notes. ### Documentation - Improve the description of the memory in observability doc. +- Update kubernetes install document to align the banyandb helm v0.3.0. ### Chores diff --git a/banyand/internal/storage/index.go b/banyand/internal/storage/index.go index af6b6424c..2e1ad14b8 100644 --- a/banyand/internal/storage/index.go +++ b/banyand/internal/storage/index.go @@ -245,7 +245,7 @@ func (s *seriesIndex) Search(ctx context.Context, series []*pbv1.Series, opts In if err != nil { return nil, nil, err } - iter, err := s.store.Iterator(ctx, fieldKey, rangeOpts, + iter, err := s.store.SeriesSort(ctx, fieldKey, rangeOpts, opts.Order.Sort, opts.PreloadSize, query, opts.Projection) if err != nil { return nil, nil, err diff --git a/banyand/measure/write.go b/banyand/measure/write.go index 2eaaffddd..bb6165d38 100644 --- a/banyand/measure/write.go +++ b/banyand/measure/write.go @@ -163,8 +163,9 @@ func (w *writeCallback) handle(dst map[string]*dataPointsInGroup, writeEvent *me IndexRuleID: r.GetMetadata().GetId(), Analyzer: r.Analyzer, }, - Term: encodeTagValue.value, - Store: true, + Term: encodeTagValue.value, + Store: true, + NoSort: r.GetNoSort(), }) } else { for _, val := range encodeTagValue.valueArr { @@ -173,8 +174,9 @@ func (w *writeCallback) handle(dst map[string]*dataPointsInGroup, writeEvent *me IndexRuleID: r.GetMetadata().GetId(), Analyzer: r.Analyzer, }, - Term: val, - Store: true, + Term: val, + Store: true, + NoSort: r.GetNoSort(), }) } } diff --git a/docs/installation/kubernetes.md b/docs/installation/kubernetes.md index b3115f7bb..409d2a1cd 100644 --- a/docs/installation/kubernetes.md +++ b/docs/installation/kubernetes.md @@ -35,8 +35,8 @@ kubectl create ns sw ```shell helm install banyandb \ oci://registry-1.docker.io/apache/skywalking-banyandb-helm \ - --version 0.2.0 \ - --set image.tag=0.6.1 \ + --version 0.3.0 \ + --set image.tag=0.7.0 \ --set standalone.enabled=true \ --set cluster.enabled=false \ --set etcd.enabled=false \ @@ -84,8 +84,8 @@ At the same time, the BanyanDB server would be listening on the `0.0.0.0:17913` ```shell helm install banyandb \ oci://registry-1.docker.io/apache/skywalking-banyandb-helm \ - --version 0.2.0 \ - --set image.tag=0.6.1 \ + --version 0.3.0 \ + --set image.tag=0.7.0 \ --set standalone.enabled=false \ --set cluster.enabled=true \ --set etcd.enabled=true \ diff --git a/pkg/index/index.go b/pkg/index/index.go index 1f1565fad..035fbf83d 100644 --- a/pkg/index/index.go +++ b/pkg/index/index.go @@ -175,7 +175,7 @@ type Writer interface { type FieldIterable interface { BuildQuery(seriesMatchers []SeriesMatcher, secondaryQuery Query) (Query, error) Iterator(ctx context.Context, fieldKey FieldKey, termRange RangeOpts, order modelv1.Sort, - preLoadSize int, query Query, fieldKeys []FieldKey) (iter FieldIterator[*DocumentResult], err error) + preLoadSize int) (iter FieldIterator[*DocumentResult], err error) Sort(ctx context.Context, sids []common.SeriesID, fieldKey FieldKey, order modelv1.Sort, timeRange *timestamp.TimeRange, preLoadSize int) (FieldIterator[*DocumentResult], error) } @@ -229,6 +229,8 @@ type SeriesStore interface { // Search returns a list of series that match the given matchers. Search(context.Context, []FieldKey, Query) ([]SeriesDocument, error) SeriesIterator(context.Context) (FieldIterator[Series], error) + SeriesSort(ctx context.Context, fieldKey FieldKey, termRange RangeOpts, order modelv1.Sort, + preLoadSize int, query Query, fieldKeys []FieldKey) (iter FieldIterator[*DocumentResult], err error) } // SeriesMatcherType represents the type of series matcher. diff --git a/pkg/index/inverted/inverted.go b/pkg/index/inverted/inverted.go index 5d35d6aa6..094f405be 100644 --- a/pkg/index/inverted/inverted.go +++ b/pkg/index/inverted/inverted.go @@ -121,7 +121,7 @@ func (s *store) Batch(batch index.Batch) error { for _, f := range d.Fields { tf := bluge.NewKeywordFieldBytes(f.Key.Marshal(), f.Term) if !f.NoSort { - tf.StoreValue().Sortable() + tf.Sortable() } if f.Store { tf.StoreValue() @@ -178,7 +178,7 @@ func (s *store) Close() error { } func (s *store) Iterator(ctx context.Context, fieldKey index.FieldKey, termRange index.RangeOpts, order modelv1.Sort, - preLoadSize int, indexQuery index.Query, fieldKeys []index.FieldKey, + preLoadSize int, ) (iter index.FieldIterator[*index.DocumentResult], err error) { if termRange.Lower != nil && termRange.Upper != nil && @@ -229,25 +229,14 @@ func (s *store) Iterator(ctx context.Context, fieldKey index.FieldKey, termRange if order == modelv1.Sort_SORT_DESC { sortedKey = "-" + sortedKey } - query := bluge.NewBooleanQuery().AddMust(rangeQuery) - node := newMustNode() - node.Append(rangeNode) - if indexQuery != nil && indexQuery.(*queryNode).query != nil { - query.AddMust(indexQuery.(*queryNode).query) - node.Append(indexQuery.(*queryNode).node) - } - fields := make([]string, 0, len(fieldKeys)) - for i := range fieldKeys { - fields = append(fields, fieldKeys[i].Marshal()) - } result := &sortIterator{ - query: &queryNode{query, node}, - fields: fields, - reader: reader, - sortedKey: sortedKey, - size: preLoadSize, - closer: s.closer, - ctx: ctx, + query: &queryNode{rangeQuery, rangeNode}, + reader: reader, + sortedKey: sortedKey, + size: preLoadSize, + closer: s.closer, + ctx: ctx, + newIterator: newBlugeMatchIterator, } return result, nil } @@ -333,7 +322,7 @@ func getMatchOptions(analyzerOnIndexRule string, opts *modelv1.Condition_MatchOp } func (s *store) Range(fieldKey index.FieldKey, opts index.RangeOpts) (list posting.List, err error) { - iter, err := s.Iterator(context.TODO(), fieldKey, opts, modelv1.Sort_SORT_ASC, defaultRangePreloadSize, nil, nil) + iter, err := s.Iterator(context.TODO(), fieldKey, opts, modelv1.Sort_SORT_ASC, defaultRangePreloadSize) if err != nil { return roaring.DummyPostingList, err } @@ -346,25 +335,22 @@ func (s *store) Range(fieldKey index.FieldKey, opts index.RangeOpts) (list posti } type blugeMatchIterator struct { - delegated search.DocumentMatchIterator - err error - closer io.Closer - needToLoadFields []string - current index.DocumentResult - hit int + delegated search.DocumentMatchIterator + err error + closer io.Closer + ctx *search.Context + current index.DocumentResult + hit int } func newBlugeMatchIterator(delegated search.DocumentMatchIterator, closer io.Closer, - needToLoadFields []string, -) blugeMatchIterator { - bmi := blugeMatchIterator{ - delegated: delegated, - closer: closer, - needToLoadFields: needToLoadFields, - current: index.DocumentResult{Values: make(map[string][]byte, len(needToLoadFields))}, - } - for _, f := range needToLoadFields { - bmi.current.Values[f] = nil + _ []string, +) blugeIterator { + bmi := &blugeMatchIterator{ + delegated: delegated, + closer: closer, + current: index.DocumentResult{}, + ctx: search.NewSearchContext(1, 0), } return bmi } @@ -391,32 +377,30 @@ func (bmi *blugeMatchIterator) Next() bool { if len(match.SortValue) > 0 { bmi.current.SortedValue = match.SortValue[0] } - err := match.VisitStoredFields(func(field string, value []byte) bool { - switch field { - case entityField: - bmi.current.EntityValues = value - case docIDField: - bmi.current.DocID = convert.BytesToUint64(value) - case seriesIDField: - bmi.current.SeriesID = common.SeriesID(convert.BytesToUint64(value)) - case timestampField: - ts, err := bluge.DecodeDateTime(value) - if err != nil { - bmi.err = err - return false - } - bmi.current.Timestamp = ts.UnixNano() - default: - if _, ok := bmi.current.Values[field]; ok { - bmi.current.Values[field] = bytes.Clone(value) - } - } - return true - }) - bmi.err = errors.WithMessagef(err, "visit stored fields, hit: %d", bmi.hit) + err := match.VisitStoredFields(bmi.setVal) + bmi.err = multierr.Combine(bmi.err, err) return bmi.err == nil } +func (bmi *blugeMatchIterator) setVal(field string, value []byte) bool { + switch field { + case docIDField: + bmi.current.DocID = convert.BytesToUint64(value) + case seriesIDField: + bmi.current.SeriesID = common.SeriesID(convert.BytesToUint64(value)) + case timestampField: + ts, errTime := bluge.DecodeDateTime(value) + if errTime != nil { + bmi.err = errTime + return false + } + bmi.current.Timestamp = ts.UnixNano() + default: + bmi.err = errors.Errorf("unexpected field: %s", field) + } + return true +} + func (bmi *blugeMatchIterator) Val() index.DocumentResult { return bmi.current } diff --git a/pkg/index/inverted/inverted_series.go b/pkg/index/inverted/inverted_series.go index 25143e592..fcf8f4672 100644 --- a/pkg/index/inverted/inverted_series.go +++ b/pkg/index/inverted/inverted_series.go @@ -21,6 +21,7 @@ package inverted import ( "bytes" "context" + "io" "github.com/blugelabs/bluge" "github.com/blugelabs/bluge/search" @@ -29,6 +30,7 @@ import ( "go.uber.org/multierr" "github.com/apache/skywalking-banyandb/api/common" + modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" "github.com/apache/skywalking-banyandb/pkg/convert" "github.com/apache/skywalking-banyandb/pkg/index" ) @@ -164,6 +166,157 @@ func parseResult(dmi search.DocumentMatchIterator, loadedFields []index.FieldKey return result, nil } +func (s *store) SeriesSort(ctx context.Context, fieldKey index.FieldKey, termRange index.RangeOpts, order modelv1.Sort, + preLoadSize int, indexQuery index.Query, fieldKeys []index.FieldKey, +) (iter index.FieldIterator[*index.DocumentResult], err error) { + if termRange.Lower != nil && + termRange.Upper != nil && + bytes.Compare(termRange.Lower, termRange.Upper) > 0 { + return index.DummyFieldIterator, nil + } + if !s.closer.AddRunning() { + return nil, nil + } + + reader, err := s.writer.Reader() + if err != nil { + return nil, err + } + fk := fieldKey.Marshal() + rangeQuery := bluge.NewBooleanQuery() + rangeNode := newMustNode() + addRange := func(query *bluge.BooleanQuery, termRange index.RangeOpts) *bluge.BooleanQuery { + if termRange.Upper == nil { + termRange.Upper = defaultUpper + } + if termRange.Lower == nil { + termRange.Lower = defaultLower + } + query.AddMust(bluge.NewTermRangeInclusiveQuery( + string(termRange.Lower), + string(termRange.Upper), + termRange.IncludesLower, + termRange.IncludesUpper, + ). + SetField(fk)) + rangeNode.Append(newTermRangeInclusiveNode(string(termRange.Lower), string(termRange.Upper), termRange.IncludesLower, termRange.IncludesUpper, nil)) + return query + } + + if fieldKey.HasSeriesID() { + rangeQuery = rangeQuery.AddMust(bluge.NewTermQuery(string(fieldKey.SeriesID.Marshal())). + SetField(seriesIDField)) + rangeNode.Append(newTermNode(string(fieldKey.SeriesID.Marshal()), nil)) + if termRange.Lower != nil || termRange.Upper != nil { + rangeQuery = addRange(rangeQuery, termRange) + } + } else { + rangeQuery = addRange(rangeQuery, termRange) + } + + sortedKey := fk + if order == modelv1.Sort_SORT_DESC { + sortedKey = "-" + sortedKey + } + query := bluge.NewBooleanQuery().AddMust(rangeQuery) + node := newMustNode() + node.Append(rangeNode) + if indexQuery != nil && indexQuery.(*queryNode).query != nil { + query.AddMust(indexQuery.(*queryNode).query) + node.Append(indexQuery.(*queryNode).node) + } + fields := make([]string, 0, len(fieldKeys)) + for i := range fieldKeys { + fields = append(fields, fieldKeys[i].Marshal()) + } + result := &sortIterator{ + query: &queryNode{query, node}, + fields: fields, + reader: reader, + sortedKey: sortedKey, + size: preLoadSize, + closer: s.closer, + ctx: ctx, + newIterator: newSeriesIterator, + } + return result, nil +} + +type seriesIterator struct { + *blugeMatchIterator + needToLoadFields []string +} + +func newSeriesIterator(delegated search.DocumentMatchIterator, closer io.Closer, + needToLoadFields []string, +) blugeIterator { + si := &seriesIterator{ + blugeMatchIterator: &blugeMatchIterator{ + delegated: delegated, + closer: closer, + ctx: search.NewSearchContext(1, 0), + current: index.DocumentResult{Values: make(map[string][]byte, len(needToLoadFields))}, + }, + needToLoadFields: append(needToLoadFields, entityField, docIDField, seriesIDField, timestampField), + } + for _, f := range needToLoadFields { + si.current.Values[f] = nil + } + return si +} + +func (si *seriesIterator) Next() bool { + var match *search.DocumentMatch + match, si.err = si.delegated.Next() + if si.err != nil { + si.err = errors.WithMessagef(si.err, "failed to get next document, hit: %d", si.hit) + return false + } + if match == nil { + si.err = io.EOF + return false + } + si.hit = match.HitNumber + for i := range si.current.Values { + si.current.Values[i] = nil + } + si.current.DocID = 0 + si.current.SeriesID = 0 + si.current.Timestamp = 0 + si.current.SortedValue = nil + if len(match.SortValue) > 0 { + si.current.SortedValue = match.SortValue[0] + } + + err := match.VisitStoredFields(si.setVal) + si.err = multierr.Combine(si.err, err) + if si.err != nil { + return false + } + return si.err == nil +} + +func (si *seriesIterator) setVal(field string, value []byte) bool { + switch field { + case entityField: + si.current.EntityValues = value + case docIDField: + si.current.DocID = convert.BytesToUint64(value) + case timestampField: + ts, errTime := bluge.DecodeDateTime(value) + if errTime != nil { + si.err = errTime + return false + } + si.current.Timestamp = ts.UnixNano() + default: + if _, ok := si.current.Values[field]; ok { + si.current.Values[field] = bytes.Clone(value) + } + } + return true +} + func (s *store) SeriesIterator(ctx context.Context) (index.FieldIterator[index.Series], error) { reader, err := s.writer.Reader() if err != nil { diff --git a/pkg/index/inverted/sort.go b/pkg/index/inverted/sort.go index e7dfd310b..68071cc90 100644 --- a/pkg/index/inverted/sort.go +++ b/pkg/index/inverted/sort.go @@ -25,6 +25,7 @@ import ( "math" "github.com/blugelabs/bluge" + "github.com/blugelabs/bluge/search" "github.com/apache/skywalking-banyandb/api/common" modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" @@ -69,26 +70,34 @@ func (s *store) Sort(ctx context.Context, sids []common.SeriesID, fieldKey index sortedKey = "-" + sortedKey } result := &sortIterator{ - query: &queryNode{query: query}, - reader: reader, - sortedKey: sortedKey, - size: preLoadSize, - ctx: ctx, + query: &queryNode{query: query}, + reader: reader, + sortedKey: sortedKey, + size: preLoadSize, + ctx: ctx, + newIterator: newBlugeMatchIterator, } return result, nil } +type blugeIterator interface { + Next() bool + Val() index.DocumentResult + Close() error +} + type sortIterator struct { - query index.Query - err error - ctx context.Context - reader *bluge.Reader - current *blugeMatchIterator - closer *run.Closer - sortedKey string - fields []string - size int - skipped int + query index.Query + err error + ctx context.Context + current blugeIterator + reader *bluge.Reader + closer *run.Closer + newIterator func(delegated search.DocumentMatchIterator, closer io.Closer, needToLoadFields []string) blugeIterator + sortedKey string + fields []string + size int + skipped int } func (si *sortIterator) Next() bool { @@ -123,8 +132,7 @@ func (si *sortIterator) loadCurrent() bool { return false } - iter := newBlugeMatchIterator(documentMatchIterator, nil, si.fields) - si.current = &iter + si.current = si.newIterator(documentMatchIterator, nil, si.fields) if si.next() { return true } diff --git a/pkg/index/testcases/duration.go b/pkg/index/testcases/duration.go index 842901561..a812cadfa 100644 --- a/pkg/index/testcases/duration.go +++ b/pkg/index/testcases/duration.go @@ -289,7 +289,7 @@ func RunDuration(t *testing.T, data map[int]posting.List, store SimpleStore) { t.Run(tt.name, func(t *testing.T) { tester := assert.New(t) is := require.New(t) - iter, err := store.Iterator(context.TODO(), tt.args.fieldKey, tt.args.termRange, tt.args.orderType, tt.preloadSize, nil, nil) + iter, err := store.Iterator(context.TODO(), tt.args.fieldKey, tt.args.termRange, tt.args.orderType, tt.preloadSize) is.NoError(err) if iter == nil { tester.Empty(tt.want)