Skip to content

Commit

Permalink
Several Changes about Index and Document (#554)
Browse files Browse the repository at this point in the history
* Remove sortable field from stored field
* Update kubernetes install document
* Remove invalid setting file

Signed-off-by: Gao Hongtao <hanahmily@gmail.com>
  • Loading branch information
hanahmily authored Nov 5, 2024
1 parent d5ec659 commit 2bc6b0a
Show file tree
Hide file tree
Showing 10 changed files with 239 additions and 119 deletions.
31 changes: 0 additions & 31 deletions .air.toml

This file was deleted.

2 changes: 2 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ Release Notes.
### Features

- Add the `bydbctl analyze series` command to analyze the series data.
- Index: Remove sortable field from the stored field. If a field is sortable only, it won't be stored.

### Bug Fixes

Expand All @@ -15,6 +16,7 @@ Release Notes.
### Documentation

- Improve the description of the memory in observability doc.
- Update kubernetes install document to align the banyandb helm v0.3.0.

### Chores

Expand Down
2 changes: 1 addition & 1 deletion banyand/internal/storage/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ func (s *seriesIndex) Search(ctx context.Context, series []*pbv1.Series, opts In
if err != nil {
return nil, nil, err
}
iter, err := s.store.Iterator(ctx, fieldKey, rangeOpts,
iter, err := s.store.SeriesSort(ctx, fieldKey, rangeOpts,
opts.Order.Sort, opts.PreloadSize, query, opts.Projection)
if err != nil {
return nil, nil, err
Expand Down
10 changes: 6 additions & 4 deletions banyand/measure/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,8 +163,9 @@ func (w *writeCallback) handle(dst map[string]*dataPointsInGroup, writeEvent *me
IndexRuleID: r.GetMetadata().GetId(),
Analyzer: r.Analyzer,
},
Term: encodeTagValue.value,
Store: true,
Term: encodeTagValue.value,
Store: true,
NoSort: r.GetNoSort(),
})
} else {
for _, val := range encodeTagValue.valueArr {
Expand All @@ -173,8 +174,9 @@ func (w *writeCallback) handle(dst map[string]*dataPointsInGroup, writeEvent *me
IndexRuleID: r.GetMetadata().GetId(),
Analyzer: r.Analyzer,
},
Term: val,
Store: true,
Term: val,
Store: true,
NoSort: r.GetNoSort(),
})
}
}
Expand Down
8 changes: 4 additions & 4 deletions docs/installation/kubernetes.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ kubectl create ns sw
```shell
helm install banyandb \
oci://registry-1.docker.io/apache/skywalking-banyandb-helm \
--version 0.2.0 \
--set image.tag=0.6.1 \
--version 0.3.0 \
--set image.tag=0.7.0 \
--set standalone.enabled=true \
--set cluster.enabled=false \
--set etcd.enabled=false \
Expand Down Expand Up @@ -84,8 +84,8 @@ At the same time, the BanyanDB server would be listening on the `0.0.0.0:17913`
```shell
helm install banyandb \
oci://registry-1.docker.io/apache/skywalking-banyandb-helm \
--version 0.2.0 \
--set image.tag=0.6.1 \
--version 0.3.0 \
--set image.tag=0.7.0 \
--set standalone.enabled=false \
--set cluster.enabled=true \
--set etcd.enabled=true \
Expand Down
4 changes: 3 additions & 1 deletion pkg/index/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ type Writer interface {
type FieldIterable interface {
BuildQuery(seriesMatchers []SeriesMatcher, secondaryQuery Query) (Query, error)
Iterator(ctx context.Context, fieldKey FieldKey, termRange RangeOpts, order modelv1.Sort,
preLoadSize int, query Query, fieldKeys []FieldKey) (iter FieldIterator[*DocumentResult], err error)
preLoadSize int) (iter FieldIterator[*DocumentResult], err error)
Sort(ctx context.Context, sids []common.SeriesID, fieldKey FieldKey,
order modelv1.Sort, timeRange *timestamp.TimeRange, preLoadSize int) (FieldIterator[*DocumentResult], error)
}
Expand Down Expand Up @@ -229,6 +229,8 @@ type SeriesStore interface {
// Search returns a list of series that match the given matchers.
Search(context.Context, []FieldKey, Query) ([]SeriesDocument, error)
SeriesIterator(context.Context) (FieldIterator[Series], error)
SeriesSort(ctx context.Context, fieldKey FieldKey, termRange RangeOpts, order modelv1.Sort,
preLoadSize int, query Query, fieldKeys []FieldKey) (iter FieldIterator[*DocumentResult], err error)
}

// SeriesMatcherType represents the type of series matcher.
Expand Down
104 changes: 44 additions & 60 deletions pkg/index/inverted/inverted.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func (s *store) Batch(batch index.Batch) error {
for _, f := range d.Fields {
tf := bluge.NewKeywordFieldBytes(f.Key.Marshal(), f.Term)
if !f.NoSort {
tf.StoreValue().Sortable()
tf.Sortable()
}
if f.Store {
tf.StoreValue()
Expand Down Expand Up @@ -178,7 +178,7 @@ func (s *store) Close() error {
}

func (s *store) Iterator(ctx context.Context, fieldKey index.FieldKey, termRange index.RangeOpts, order modelv1.Sort,
preLoadSize int, indexQuery index.Query, fieldKeys []index.FieldKey,
preLoadSize int,
) (iter index.FieldIterator[*index.DocumentResult], err error) {
if termRange.Lower != nil &&
termRange.Upper != nil &&
Expand Down Expand Up @@ -229,25 +229,14 @@ func (s *store) Iterator(ctx context.Context, fieldKey index.FieldKey, termRange
if order == modelv1.Sort_SORT_DESC {
sortedKey = "-" + sortedKey
}
query := bluge.NewBooleanQuery().AddMust(rangeQuery)
node := newMustNode()
node.Append(rangeNode)
if indexQuery != nil && indexQuery.(*queryNode).query != nil {
query.AddMust(indexQuery.(*queryNode).query)
node.Append(indexQuery.(*queryNode).node)
}
fields := make([]string, 0, len(fieldKeys))
for i := range fieldKeys {
fields = append(fields, fieldKeys[i].Marshal())
}
result := &sortIterator{
query: &queryNode{query, node},
fields: fields,
reader: reader,
sortedKey: sortedKey,
size: preLoadSize,
closer: s.closer,
ctx: ctx,
query: &queryNode{rangeQuery, rangeNode},
reader: reader,
sortedKey: sortedKey,
size: preLoadSize,
closer: s.closer,
ctx: ctx,
newIterator: newBlugeMatchIterator,
}
return result, nil
}
Expand Down Expand Up @@ -333,7 +322,7 @@ func getMatchOptions(analyzerOnIndexRule string, opts *modelv1.Condition_MatchOp
}

func (s *store) Range(fieldKey index.FieldKey, opts index.RangeOpts) (list posting.List, err error) {
iter, err := s.Iterator(context.TODO(), fieldKey, opts, modelv1.Sort_SORT_ASC, defaultRangePreloadSize, nil, nil)
iter, err := s.Iterator(context.TODO(), fieldKey, opts, modelv1.Sort_SORT_ASC, defaultRangePreloadSize)
if err != nil {
return roaring.DummyPostingList, err
}
Expand All @@ -346,25 +335,22 @@ func (s *store) Range(fieldKey index.FieldKey, opts index.RangeOpts) (list posti
}

type blugeMatchIterator struct {
delegated search.DocumentMatchIterator
err error
closer io.Closer
needToLoadFields []string
current index.DocumentResult
hit int
delegated search.DocumentMatchIterator
err error
closer io.Closer
ctx *search.Context
current index.DocumentResult
hit int
}

func newBlugeMatchIterator(delegated search.DocumentMatchIterator, closer io.Closer,
needToLoadFields []string,
) blugeMatchIterator {
bmi := blugeMatchIterator{
delegated: delegated,
closer: closer,
needToLoadFields: needToLoadFields,
current: index.DocumentResult{Values: make(map[string][]byte, len(needToLoadFields))},
}
for _, f := range needToLoadFields {
bmi.current.Values[f] = nil
_ []string,
) blugeIterator {
bmi := &blugeMatchIterator{
delegated: delegated,
closer: closer,
current: index.DocumentResult{},
ctx: search.NewSearchContext(1, 0),
}
return bmi
}
Expand All @@ -391,32 +377,30 @@ func (bmi *blugeMatchIterator) Next() bool {
if len(match.SortValue) > 0 {
bmi.current.SortedValue = match.SortValue[0]
}
err := match.VisitStoredFields(func(field string, value []byte) bool {
switch field {
case entityField:
bmi.current.EntityValues = value
case docIDField:
bmi.current.DocID = convert.BytesToUint64(value)
case seriesIDField:
bmi.current.SeriesID = common.SeriesID(convert.BytesToUint64(value))
case timestampField:
ts, err := bluge.DecodeDateTime(value)
if err != nil {
bmi.err = err
return false
}
bmi.current.Timestamp = ts.UnixNano()
default:
if _, ok := bmi.current.Values[field]; ok {
bmi.current.Values[field] = bytes.Clone(value)
}
}
return true
})
bmi.err = errors.WithMessagef(err, "visit stored fields, hit: %d", bmi.hit)
err := match.VisitStoredFields(bmi.setVal)
bmi.err = multierr.Combine(bmi.err, err)
return bmi.err == nil
}

func (bmi *blugeMatchIterator) setVal(field string, value []byte) bool {
switch field {
case docIDField:
bmi.current.DocID = convert.BytesToUint64(value)
case seriesIDField:
bmi.current.SeriesID = common.SeriesID(convert.BytesToUint64(value))
case timestampField:
ts, errTime := bluge.DecodeDateTime(value)
if errTime != nil {
bmi.err = errTime
return false
}
bmi.current.Timestamp = ts.UnixNano()
default:
bmi.err = errors.Errorf("unexpected field: %s", field)
}
return true
}

func (bmi *blugeMatchIterator) Val() index.DocumentResult {
return bmi.current
}
Expand Down
Loading

0 comments on commit 2bc6b0a

Please sign in to comment.