Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bug: fix the index mode still writes to the data files #559

Merged
merged 2 commits into from
Nov 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions banyand/internal/storage/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,14 @@ func newSeriesIndex(ctx context.Context, root string, flushTimeoutSeconds int64,
return si, nil
}

func (s *seriesIndex) Write(docs index.Documents) error {
return s.store.SeriesBatch(index.Batch{
func (s *seriesIndex) Insert(docs index.Documents) error {
return s.store.InsertSeriesBatch(index.Batch{
Documents: docs,
})
}

func (s *seriesIndex) Update(docs index.Documents) error {
return s.store.UpdateSeriesBatch(index.Batch{
Documents: docs,
})
}
Expand Down
2 changes: 1 addition & 1 deletion banyand/internal/storage/index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func TestSeriesIndex_Primary(t *testing.T) {
copy(doc.EntityValues, series.Buffer)
docs = append(docs, doc)
}
require.NoError(t, si.Write(docs))
require.NoError(t, si.Insert(docs))
// Restart the index
require.NoError(t, si.Close())
si, err = newSeriesIndex(ctx, path, 0, nil)
Expand Down
3 changes: 2 additions & 1 deletion banyand/internal/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,8 @@ type FieldResultList []FieldResult

// IndexDB is the interface of index database.
type IndexDB interface {
Write(docs index.Documents) error
Insert(docs index.Documents) error
Update(docs index.Documents) error
Search(ctx context.Context, series []*pbv1.Series, opts IndexSearchOpts) (pbv1.SeriesList, FieldResultList, []int64, [][]byte, error)
}

Expand Down
11 changes: 6 additions & 5 deletions banyand/measure/datapoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,9 +166,10 @@ type dataPointsInTable struct {
}

type dataPointsInGroup struct {
tsdb storage.TSDB[*tsTable, option]
docs index.Documents
tables []*dataPointsInTable
segments []storage.Segment[*tsTable, option]
latestTS int64
tsdb storage.TSDB[*tsTable, option]
metadataDocs index.Documents
indexModeDocs index.Documents
tables []*dataPointsInTable
segments []storage.Segment[*tsTable, option]
latestTS int64
}
65 changes: 47 additions & 18 deletions banyand/measure/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,14 +83,6 @@ func (w *writeCallback) handle(dst map[string]*dataPointsInGroup, writeEvent *me
break
}
}
shardID := common.ShardID(writeEvent.ShardId)
if dpt == nil {
if dpt, err = w.newDpt(tsdb, dpg, t, ts, shardID); err != nil {
return nil, fmt.Errorf("cannot create data points in table: %w", err)
}
}
dpt.dataPoints.timestamps = append(dpt.dataPoints.timestamps, ts)
dpt.dataPoints.versions = append(dpt.dataPoints.versions, req.DataPoint.Version)
stm, ok := w.schemaRepo.loadMeasure(req.GetMetadata())
if !ok {
return nil, fmt.Errorf("cannot find measure definition: %s", req.GetMetadata())
Expand All @@ -102,14 +94,39 @@ func (w *writeCallback) handle(dst map[string]*dataPointsInGroup, writeEvent *me
if fLen > len(stm.schema.GetTagFamilies()) {
return nil, fmt.Errorf("%s has more tag families than %s", req.Metadata, stm.schema)
}

shardID := common.ShardID(writeEvent.ShardId)
if dpt == nil {
if dpt, err = w.newDpt(tsdb, dpg, t, ts, shardID, stm.schema.IndexMode); err != nil {
return nil, fmt.Errorf("cannot create data points in table: %w", err)
}
}

series := &pbv1.Series{
Subject: req.Metadata.Name,
EntityValues: writeEvent.EntityValues,
}
if err := series.Marshal(); err != nil {
return nil, fmt.Errorf("cannot marshal series: %w", err)
}

tagFamily, fields := w.handleTagFamily(stm, req)
if stm.schema.IndexMode {
doc := index.Document{
DocID: uint64(series.ID),
EntityValues: series.Buffer,
Fields: fields,
}
doc.Timestamp = ts
dpg.indexModeDocs = append(dpg.indexModeDocs, doc)
return dst, nil
}

dpt.dataPoints.tagFamilies = append(dpt.dataPoints.tagFamilies, tagFamily)
dpt.dataPoints.timestamps = append(dpt.dataPoints.timestamps, ts)
dpt.dataPoints.versions = append(dpt.dataPoints.versions, req.DataPoint.Version)
dpt.dataPoints.seriesIDs = append(dpt.dataPoints.seriesIDs, series.ID)

field := nameValues{}
for i := range stm.GetSchema().GetFields() {
var v *modelv1.FieldValue
Expand All @@ -125,8 +142,6 @@ func (w *writeCallback) handle(dst map[string]*dataPointsInGroup, writeEvent *me
))
}
dpt.dataPoints.fields = append(dpt.dataPoints.fields, field)
tagFamily, fields := w.handleTagFamily(stm, req)
dpt.dataPoints.tagFamilies = append(dpt.dataPoints.tagFamilies, tagFamily)

if stm.processorManager != nil {
stm.processorManager.onMeasureWrite(uint64(series.ID), &measurev1.InternalWriteRequest{
Expand All @@ -144,15 +159,14 @@ func (w *writeCallback) handle(dst map[string]*dataPointsInGroup, writeEvent *me
EntityValues: series.Buffer,
Fields: fields,
}
if stm.schema.IndexMode {
doc.Timestamp = ts
}
dpg.docs = append(dpg.docs, doc)
dpg.metadataDocs = append(dpg.metadataDocs, doc)

return dst, nil
}

func (w *writeCallback) newDpt(tsdb storage.TSDB[*tsTable, option], dpg *dataPointsInGroup, t time.Time, ts int64, shardID common.ShardID) (*dataPointsInTable, error) {
func (w *writeCallback) newDpt(tsdb storage.TSDB[*tsTable, option], dpg *dataPointsInGroup,
t time.Time, ts int64, shardID common.ShardID, indexMode bool,
) (*dataPointsInTable, error) {
var segment storage.Segment[*tsTable, option]
for _, seg := range dpg.segments {
if seg.GetTimeRange().Contains(ts) {
Expand All @@ -167,6 +181,12 @@ func (w *writeCallback) newDpt(tsdb storage.TSDB[*tsTable, option], dpg *dataPoi
}
dpg.segments = append(dpg.segments, segment)
}
if indexMode {
return &dataPointsInTable{
timeRange: segment.GetTimeRange(),
}, nil
}

tstb, err := segment.CreateTSTableIfNotExist(shardID)
if err != nil {
return nil, fmt.Errorf("cannot create ts table: %w", err)
Expand Down Expand Up @@ -295,11 +315,20 @@ func (w *writeCallback) Rev(_ context.Context, message bus.Message) (resp bus.Me
g := groups[i]
for j := range g.tables {
dps := g.tables[j]
dps.tsTable.mustAddDataPoints(&dps.dataPoints)
if dps.tsTable != nil {
dps.tsTable.mustAddDataPoints(&dps.dataPoints)
}
}
for _, segment := range g.segments {
if err := segment.IndexDB().Write(g.docs); err != nil {
w.l.Error().Err(err).Msg("cannot write index")
if len(g.metadataDocs) > 0 {
if err := segment.IndexDB().Insert(g.metadataDocs); err != nil {
w.l.Error().Err(err).Msg("cannot write metadata")
}
}
if len(g.indexModeDocs) > 0 {
if err := segment.IndexDB().Update(g.indexModeDocs); err != nil {
w.l.Error().Err(err).Msg("cannot write index")
}
}
segment.DecRef()
}
Expand Down
2 changes: 1 addition & 1 deletion banyand/stream/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ func write(b *testing.B, p parameter, esList []*elements, docsList []index.Docum
}
seg, err := db.CreateSegmentIfNotExist(time.Unix(0, esList[0].timestamps[0]))
require.NoError(b, err)
seg.IndexDB().Write(docs)
seg.IndexDB().Insert(docs)

tst, err := seg.CreateTSTableIfNotExist(common.ShardID(0))
require.NoError(b, err)
Expand Down
2 changes: 1 addition & 1 deletion banyand/stream/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ func (w *writeCallback) Rev(_ context.Context, message bus.Message) (resp bus.Me
}
if len(g.docs) > 0 {
for _, segment := range g.segments {
if err := segment.IndexDB().Write(g.docs); err != nil {
if err := segment.IndexDB().Insert(g.docs); err != nil {
w.l.Error().Err(err).Msg("cannot write index")
}
segment.DecRef()
Expand Down
3 changes: 2 additions & 1 deletion pkg/index/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,8 @@ type Batch struct {
// Writer allows writing fields and docID in a document to an index.
type Writer interface {
Batch(batch Batch) error
SeriesBatch(batch Batch) error
InsertSeriesBatch(batch Batch) error
UpdateSeriesBatch(batch Batch) error
}

// FieldIterable allows building a FieldIterator.
Expand Down
64 changes: 41 additions & 23 deletions pkg/index/inverted/inverted_series.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import (

var emptySeries = make([]index.SeriesDocument, 0)

func (s *store) SeriesBatch(batch index.Batch) error {
func (s *store) InsertSeriesBatch(batch index.Batch) error {
if len(batch.Documents) == 0 {
return nil
}
Expand All @@ -49,34 +49,52 @@ func (s *store) SeriesBatch(batch index.Batch) error {
b := generateBatch()
defer releaseBatch(b)
for _, d := range batch.Documents {
doc := bluge.NewDocument(convert.BytesToString(d.EntityValues))
for _, f := range d.Fields {
tf := bluge.NewKeywordFieldBytes(f.Key.Marshal(), f.Term)
if !f.Index {
tf.FieldOptions = 0
} else if !f.NoSort {
tf.Sortable()
}
doc := toDoc(d)
b.InsertIfAbsent(doc.ID(), doc)
}
return s.writer.Batch(b)
}

if f.Store {
tf.StoreValue()
}
if f.Key.Analyzer != index.AnalyzerUnspecified {
tf = tf.WithAnalyzer(Analyzers[f.Key.Analyzer])
}
doc.AddField(tf)
func (s *store) UpdateSeriesBatch(batch index.Batch) error {
if len(batch.Documents) == 0 {
return nil
}
if !s.closer.AddRunning() {
return nil
}
defer s.closer.Done()
b := generateBatch()
defer releaseBatch(b)
for _, d := range batch.Documents {
doc := toDoc(d)
b.Update(doc.ID(), doc)
}
return s.writer.Batch(b)
}

func toDoc(d index.Document) *bluge.Document {
doc := bluge.NewDocument(convert.BytesToString(d.EntityValues))
for _, f := range d.Fields {
tf := bluge.NewKeywordFieldBytes(f.Key.Marshal(), f.Term)
if !f.Index {
tf.FieldOptions = 0
} else if !f.NoSort {
tf.Sortable()
}

if d.Timestamp > 0 {
doc.AddField(bluge.NewDateTimeField(timestampField, time.Unix(0, d.Timestamp)).StoreValue())
if f.Store {
tf.StoreValue()
}
if len(d.Fields) == 0 {
b.InsertIfAbsent(doc.ID(), doc)
} else {
b.Update(doc.ID(), doc)
if f.Key.Analyzer != index.AnalyzerUnspecified {
tf = tf.WithAnalyzer(Analyzers[f.Key.Analyzer])
}
doc.AddField(tf)
}
return s.writer.Batch(b)

if d.Timestamp > 0 {
doc.AddField(bluge.NewDateTimeField(timestampField, time.Unix(0, d.Timestamp)).StoreValue())
}
return doc
}

// BuildQuery implements index.SeriesStore.
Expand Down
44 changes: 31 additions & 13 deletions pkg/index/inverted/inverted_series_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package inverted
import (
"context"
"maps"
"sort"
"testing"
"time"

Expand Down Expand Up @@ -63,7 +64,7 @@ func TestStore_Search(t *testing.T) {
}()

// Setup some data
setupData(tester, s)
insertData(tester, s)

// Test cases
tests := []struct {
Expand Down Expand Up @@ -203,6 +204,12 @@ func TestStore_Search(t *testing.T) {
require.NoError(t, err)
got, err := s.Search(context.Background(), tt.projection, query)
require.NoError(t, err)
sort.Slice(tt.want, func(i, j int) bool {
return string(tt.want[i].Key.EntityValues) < string(tt.want[j].Key.EntityValues)
})
sort.Slice(got, func(i, j int) bool {
return string(got[i].Key.EntityValues) < string(got[j].Key.EntityValues)
})
assert.Equal(t, tt.want, got)
})
}
Expand All @@ -222,7 +229,7 @@ func TestStore_SearchWildcard(t *testing.T) {
}()

// Setup some data
setupData(tester, s)
insertData(tester, s)

// Test cases
tests := []struct {
Expand Down Expand Up @@ -316,7 +323,7 @@ func TestStore_SearchPrefix(t *testing.T) {
}()

// Setup some data
setupData(tester, s)
insertData(tester, s)

// Test cases
tests := []struct {
Expand Down Expand Up @@ -389,7 +396,7 @@ func TestStore_SearchWithSecondaryQuery(t *testing.T) {
}()

// Setup some data
setupData(tester, s)
insertData(tester, s)

// Define the secondary query
secondaryQuery := &queryNode{
Expand Down Expand Up @@ -500,7 +507,7 @@ func TestStore_SeriesSort(t *testing.T) {
}()

// Setup some data
setupData(tester, s)
updateData(tester, s)

// Define the order by field
orderBy := &index.OrderBy{
Expand Down Expand Up @@ -729,7 +736,7 @@ func TestStore_TimestampSort(t *testing.T) {
}()

// Setup some data
setupData(tester, s)
updateData(tester, s)

// Define the order by field
orderBy := &index.OrderBy{
Expand Down Expand Up @@ -883,7 +890,13 @@ func TestStore_TimestampSort(t *testing.T) {
}
}

func setupData(tester *require.Assertions, s index.SeriesStore) {
func insertData(tester *require.Assertions, s index.SeriesStore) {
b1, b2 := generateDocs()
tester.NoError(s.InsertSeriesBatch(b1))
tester.NoError(s.InsertSeriesBatch(b2))
}

func generateDocs() (index.Batch, index.Batch) {
series1 := index.Document{
EntityValues: []byte("test1"),
}
Expand Down Expand Up @@ -965,10 +978,15 @@ func setupData(tester *require.Assertions, s index.SeriesStore) {
},
Timestamp: int64(2001),
}
tester.NoError(s.SeriesBatch(index.Batch{
Documents: []index.Document{series1, series2, series4, series3},
}))
tester.NoError(s.SeriesBatch(index.Batch{
Documents: []index.Document{series3},
}))
return index.Batch{
Documents: []index.Document{series1, series2, series4, series3},
}, index.Batch{
Documents: []index.Document{series3},
}
}

func updateData(tester *require.Assertions, s index.SeriesStore) {
b1, b2 := generateDocs()
tester.NoError(s.UpdateSeriesBatch(b1))
tester.NoError(s.UpdateSeriesBatch(b2))
}
Loading
Loading