Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Measure: Introduce "index_mode" #557

Merged
merged 8 commits into from
Nov 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ Release Notes.
- Add the `bydbctl analyze series` command to analyze the series data.
- Index: Remove sortable field from the stored field. If a field is sortable only, it won't be stored.
- Index: Support InsertIfAbsent functionality which ensures documents are only inserted if their docIDs are not already present in the current index. There is a exception for the documents with extra index fields more than the entity's index fields.
- Measure: Introduce "index_mode" to save data exclusively in the series index, ideal for non-timeseries measures.

### Bug Fixes

Expand Down
3 changes: 3 additions & 0 deletions api/proto/banyandb/database/v1/schema.proto
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,9 @@ message Measure {
string interval = 5;
// updated_at indicates when the measure is updated
google.protobuf.Timestamp updated_at = 6;
// index_mode specifies whether the data should be stored exclusively in the index,
// meaning it will not be stored in the data storage system.
bool index_mode = 7;
}

message MeasureAggregateFunction {
Expand Down
3 changes: 3 additions & 0 deletions api/validate/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,9 @@ func Measure(measure *databasev1.Measure) error {
if len(measure.TagFamilies) == 0 {
return errors.New("measure tag families is empty")
}
if measure.IndexMode && len(measure.Fields) > 0 {
return errors.New("index mode is enabled, but fields are not empty")
}
return tagFamily(measure.TagFamilies)
}

Expand Down
92 changes: 44 additions & 48 deletions banyand/internal/storage/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,15 @@ import (
"github.com/apache/skywalking-banyandb/pkg/logger"
pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
"github.com/apache/skywalking-banyandb/pkg/query"
"github.com/apache/skywalking-banyandb/pkg/timestamp"
)

func (s *segment[T, O]) IndexDB() IndexDB {
return s.index
}

func (s *segment[T, O]) Lookup(ctx context.Context, series []*pbv1.Series) (pbv1.SeriesList, error) {
sl, _, err := s.index.filter(ctx, series, nil, nil)
sl, _, _, err := s.index.filter(ctx, series, nil, nil, nil)
return sl, err
}

Expand Down Expand Up @@ -76,21 +77,19 @@ func (s *seriesIndex) Write(docs index.Documents) error {
})
}

var rangeOpts = index.RangeOpts{}

func (s *seriesIndex) filter(ctx context.Context, series []*pbv1.Series,
projection []index.FieldKey, secondaryQuery index.Query,
) (sl pbv1.SeriesList, fields FieldResultList, err error) {
projection []index.FieldKey, secondaryQuery index.Query, timeRange *timestamp.TimeRange,
) (sl pbv1.SeriesList, fields FieldResultList, tss []int64, err error) {
seriesMatchers := make([]index.SeriesMatcher, len(series))
for i := range series {
seriesMatchers[i], err = convertEntityValuesToSeriesMatcher(series[i])
if err != nil {
return nil, nil, err
return nil, nil, nil, err
}
}
indexQuery, err := s.store.BuildQuery(seriesMatchers, secondaryQuery)
indexQuery, err := s.store.BuildQuery(seriesMatchers, secondaryQuery, timeRange)
if err != nil {
return nil, nil, err
return nil, nil, nil, err
}
tracer := query.GetTracer(ctx)
if tracer != nil {
Expand All @@ -109,13 +108,13 @@ func (s *seriesIndex) filter(ctx context.Context, series []*pbv1.Series,
}
ss, err := s.store.Search(ctx, projection, indexQuery)
if err != nil {
return nil, nil, err
return nil, nil, nil, err
}
sl, fields, err = convertIndexSeriesToSeriesList(ss, len(projection) > 0)
sl, fields, tss, err = convertIndexSeriesToSeriesList(ss, len(projection) > 0)
if err != nil {
return nil, nil, errors.WithMessagef(err, "failed to convert index series to series list, matchers: %v, matched: %d", seriesMatchers, len(ss))
return nil, nil, nil, errors.WithMessagef(err, "failed to convert index series to series list, matchers: %v, matched: %d", seriesMatchers, len(ss))
}
return sl, fields, nil
return sl, fields, tss, nil
}

var emptySeriesMatcher = index.SeriesMatcher{}
Expand Down Expand Up @@ -173,31 +172,38 @@ func convertEntityValuesToSeriesMatcher(series *pbv1.Series) (index.SeriesMatche
}, nil
}

func convertIndexSeriesToSeriesList(indexSeries []index.SeriesDocument, hasFields bool) (pbv1.SeriesList, FieldResultList, error) {
func convertIndexSeriesToSeriesList(indexSeries []index.SeriesDocument, hasFields bool) (pbv1.SeriesList, FieldResultList, []int64, error) {
seriesList := make(pbv1.SeriesList, 0, len(indexSeries))
var fields FieldResultList
if hasFields {
fields = make(FieldResultList, 0, len(indexSeries))
}
var timestamps []int64
for _, s := range indexSeries {
var series pbv1.Series
series.ID = s.Key.ID
if err := series.Unmarshal(s.Key.EntityValues); err != nil {
return nil, nil, errors.WithMessagef(err, "failed to unmarshal series: %s", s.Key.EntityValues)
return nil, nil, nil, errors.WithMessagef(err, "failed to unmarshal series: %s", s.Key.EntityValues)
}
seriesList = append(seriesList, &series)
if fields != nil {
fields = append(fields, s.Fields)
}
if s.Timestamp > 0 {
timestamps = append(timestamps, s.Timestamp)
}
}
return seriesList, fields, nil
return seriesList, fields, timestamps, nil
}

func (s *seriesIndex) Search(ctx context.Context, series []*pbv1.Series, opts IndexSearchOpts) (sl pbv1.SeriesList, frl FieldResultList, err error) {
func (s *seriesIndex) Search(ctx context.Context, series []*pbv1.Series, opts IndexSearchOpts,
) (sl pbv1.SeriesList, frl FieldResultList, tss []int64, sortedValues [][]byte, err error) {
tracer := query.GetTracer(ctx)
if tracer != nil {
var span *query.Span
span, ctx = tracer.StartSpan(ctx, "seriesIndex.Search")
if opts.Query != nil {
span.Tagf("secondary_query", "%s", opts.Query.String())
}
defer func() {
if err != nil {
span.Error(err)
Expand All @@ -207,21 +213,11 @@ func (s *seriesIndex) Search(ctx context.Context, series []*pbv1.Series, opts In
}

if opts.Order == nil || opts.Order.Index == nil {
var seriesList pbv1.SeriesList
var fieldResultList FieldResultList
if opts.Query != nil {
seriesList, fieldResultList, err = s.filter(ctx, series, opts.Projection, opts.Query)
} else {
seriesList, fieldResultList, err = s.filter(ctx, series, opts.Projection, nil)
}
sl, frl, tss, err = s.filter(ctx, series, opts.Projection, opts.Query, opts.TimeRange)
if err != nil {
return nil, nil, err
return nil, nil, nil, nil, err
}
return seriesList, fieldResultList, nil
}

fieldKey := index.FieldKey{
IndexRuleID: opts.Order.Index.GetMetadata().Id,
return sl, frl, tss, nil, nil
}
var span *query.Span
if tracer != nil {
Expand All @@ -238,43 +234,43 @@ func (s *seriesIndex) Search(ctx context.Context, series []*pbv1.Series, opts In
for i := range series {
seriesMatchers[i], err = convertEntityValuesToSeriesMatcher(series[i])
if err != nil {
return nil, nil, err
return nil, nil, nil, nil, err
}
}
query, err := s.store.BuildQuery(seriesMatchers, opts.Query)
query, err := s.store.BuildQuery(seriesMatchers, opts.Query, opts.TimeRange)
if err != nil {
return nil, nil, err
return nil, nil, nil, nil, err
}
iter, err := s.store.SeriesSort(ctx, fieldKey, rangeOpts,
opts.Order.Sort, opts.PreloadSize, query, opts.Projection)
iter, err := s.store.SeriesSort(ctx, query, opts.Order,
opts.PreloadSize, opts.Projection)
if err != nil {
return nil, nil, err
return nil, nil, nil, nil, err
}
defer func() {
err = multierr.Append(err, iter.Close())
}()

var r int
result := make([]index.SeriesDocument, 0, 10)
for iter.Next() {
r++
val := iter.Val()
var doc index.SeriesDocument
doc.Fields = maps.Clone(val.Values)
doc.Key.ID = common.SeriesID(val.DocID)
doc.Key.EntityValues = val.EntityValues
result = append(result, doc)
}
sortedSeriesList, sortedFieldResultList, err := convertIndexSeriesToSeriesList(result, len(opts.Projection) > 0)
if err != nil {
return nil, nil, errors.WithMessagef(err, "failed to convert index series to series list, matchers: %v, matched: %d", seriesMatchers, len(result))
var series pbv1.Series
if err = series.Unmarshal(val.EntityValues); err != nil {
return nil, nil, nil, nil, errors.WithMessagef(err, "failed to unmarshal series: %s", val.EntityValues)
}
sl = append(sl, &series)
tss = append(tss, val.Timestamp)
if len(opts.Projection) > 0 {
frl = append(frl, maps.Clone(val.Values))
}
sortedValues = append(sortedValues, val.SortedValue)
}
if span != nil {
span.Tagf("query", "%s", iter.Query().String())
span.Tagf("rounds", "%d", r)
span.Tagf("size", "%d", len(sortedSeriesList))
span.Tagf("size", "%d", len(sl))
}
return sortedSeriesList, sortedFieldResultList, err
return sl, frl, tss, sortedValues, err
}

func (s *seriesIndex) Close() error {
Expand Down
4 changes: 3 additions & 1 deletion banyand/internal/storage/index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/apache/skywalking-banyandb/api/common"
modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
"github.com/apache/skywalking-banyandb/pkg/index"
"github.com/apache/skywalking-banyandb/pkg/logger"
Expand Down Expand Up @@ -157,11 +158,12 @@ func TestSeriesIndex_Primary(t *testing.T) {
seriesQuery.EntityValues = tt.entityValues[i]
seriesQueries = append(seriesQueries, &seriesQuery)
}
sl, _, err := si.filter(ctx, seriesQueries, nil, nil)
sl, _, _, err := si.filter(ctx, seriesQueries, nil, nil, nil)
require.NoError(t, err)
require.Equal(t, len(tt.entityValues), len(sl))
assert.Equal(t, tt.subject, sl[0].Subject)
for i := range tt.expected {
assert.Greater(t, sl[i].ID, common.SeriesID(0))
assert.Equal(t, tt.expected[i][0].GetStr().GetValue(), sl[i].EntityValues[0].GetStr().GetValue())
assert.Equal(t, tt.expected[i][1].GetStr().GetValue(), sl[i].EntityValues[1].GetStr().GetValue())
assert.True(t, sl[0].ID > 0)
Expand Down
3 changes: 3 additions & 0 deletions banyand/internal/storage/segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,9 @@ func (sc *segmentController[T, O]) selectSegments(timeRange timestamp.TimeRange)
last := len(sc.lst) - 1
for i := range sc.lst {
s := sc.lst[last-i]
if s.GetTimeRange().End.Before(timeRange.Start) {
break
}
if s.Overlapping(timeRange) {
s.incRef()
tt = append(tt, s)
Expand Down
6 changes: 3 additions & 3 deletions banyand/internal/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ import (
"github.com/apache/skywalking-banyandb/pkg/index"
"github.com/apache/skywalking-banyandb/pkg/logger"
pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
"github.com/apache/skywalking-banyandb/pkg/query/model"
"github.com/apache/skywalking-banyandb/pkg/timestamp"
)

Expand Down Expand Up @@ -67,7 +66,8 @@ type SupplyTSDB[T TSTable] func() T
// IndexSearchOpts is the options for searching index.
type IndexSearchOpts struct {
Query index.Query
Order *model.OrderBy
Order *index.OrderBy
TimeRange *timestamp.TimeRange
Projection []index.FieldKey
PreloadSize int
}
Expand All @@ -81,7 +81,7 @@ type FieldResultList []FieldResult
// IndexDB is the interface of index database.
type IndexDB interface {
Write(docs index.Documents) error
Search(ctx context.Context, series []*pbv1.Series, opts IndexSearchOpts) (pbv1.SeriesList, FieldResultList, error)
Search(ctx context.Context, series []*pbv1.Series, opts IndexSearchOpts) (pbv1.SeriesList, FieldResultList, []int64, [][]byte, error)
}

// TSDB allows listing and getting shard details.
Expand Down
2 changes: 1 addition & 1 deletion banyand/measure/measure.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func (s *measure) parseSpec() (err error) {
if s.schema.Interval != "" {
s.interval, err = timestamp.ParseDuration(s.schema.Interval)
}
s.indexRuleLocators, s.fieldIndexLocation = partition.ParseIndexRuleLocators(s.schema.GetEntity(), s.schema.GetTagFamilies(), s.indexRules)
s.indexRuleLocators, s.fieldIndexLocation = partition.ParseIndexRuleLocators(s.schema.GetEntity(), s.schema.GetTagFamilies(), s.indexRules, s.schema.IndexMode)

return err
}
Expand Down
Loading
Loading