Skip to content

Commit

Permalink
bug: fix the index mode still writes to the data files (#559)
Browse files Browse the repository at this point in the history
  • Loading branch information
hanahmily authored Nov 22, 2024
1 parent 0794832 commit 05cf5e1
Show file tree
Hide file tree
Showing 25 changed files with 184 additions and 80 deletions.
10 changes: 8 additions & 2 deletions banyand/internal/storage/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,14 @@ func newSeriesIndex(ctx context.Context, root string, flushTimeoutSeconds int64,
return si, nil
}

func (s *seriesIndex) Write(docs index.Documents) error {
return s.store.SeriesBatch(index.Batch{
func (s *seriesIndex) Insert(docs index.Documents) error {
return s.store.InsertSeriesBatch(index.Batch{
Documents: docs,
})
}

func (s *seriesIndex) Update(docs index.Documents) error {
return s.store.UpdateSeriesBatch(index.Batch{
Documents: docs,
})
}
Expand Down
2 changes: 1 addition & 1 deletion banyand/internal/storage/index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func TestSeriesIndex_Primary(t *testing.T) {
copy(doc.EntityValues, series.Buffer)
docs = append(docs, doc)
}
require.NoError(t, si.Write(docs))
require.NoError(t, si.Insert(docs))
// Restart the index
require.NoError(t, si.Close())
si, err = newSeriesIndex(ctx, path, 0, nil)
Expand Down
3 changes: 2 additions & 1 deletion banyand/internal/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,8 @@ type FieldResultList []FieldResult

// IndexDB is the interface of index database.
type IndexDB interface {
Write(docs index.Documents) error
Insert(docs index.Documents) error
Update(docs index.Documents) error
Search(ctx context.Context, series []*pbv1.Series, opts IndexSearchOpts) (pbv1.SeriesList, FieldResultList, []int64, [][]byte, error)
}

Expand Down
11 changes: 6 additions & 5 deletions banyand/measure/datapoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,9 +166,10 @@ type dataPointsInTable struct {
}

type dataPointsInGroup struct {
tsdb storage.TSDB[*tsTable, option]
docs index.Documents
tables []*dataPointsInTable
segments []storage.Segment[*tsTable, option]
latestTS int64
tsdb storage.TSDB[*tsTable, option]
metadataDocs index.Documents
indexModeDocs index.Documents
tables []*dataPointsInTable
segments []storage.Segment[*tsTable, option]
latestTS int64
}
65 changes: 47 additions & 18 deletions banyand/measure/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,14 +83,6 @@ func (w *writeCallback) handle(dst map[string]*dataPointsInGroup, writeEvent *me
break
}
}
shardID := common.ShardID(writeEvent.ShardId)
if dpt == nil {
if dpt, err = w.newDpt(tsdb, dpg, t, ts, shardID); err != nil {
return nil, fmt.Errorf("cannot create data points in table: %w", err)
}
}
dpt.dataPoints.timestamps = append(dpt.dataPoints.timestamps, ts)
dpt.dataPoints.versions = append(dpt.dataPoints.versions, req.DataPoint.Version)
stm, ok := w.schemaRepo.loadMeasure(req.GetMetadata())
if !ok {
return nil, fmt.Errorf("cannot find measure definition: %s", req.GetMetadata())
Expand All @@ -102,14 +94,39 @@ func (w *writeCallback) handle(dst map[string]*dataPointsInGroup, writeEvent *me
if fLen > len(stm.schema.GetTagFamilies()) {
return nil, fmt.Errorf("%s has more tag families than %s", req.Metadata, stm.schema)
}

shardID := common.ShardID(writeEvent.ShardId)
if dpt == nil {
if dpt, err = w.newDpt(tsdb, dpg, t, ts, shardID, stm.schema.IndexMode); err != nil {
return nil, fmt.Errorf("cannot create data points in table: %w", err)
}
}

series := &pbv1.Series{
Subject: req.Metadata.Name,
EntityValues: writeEvent.EntityValues,
}
if err := series.Marshal(); err != nil {
return nil, fmt.Errorf("cannot marshal series: %w", err)
}

tagFamily, fields := w.handleTagFamily(stm, req)
if stm.schema.IndexMode {
doc := index.Document{
DocID: uint64(series.ID),
EntityValues: series.Buffer,
Fields: fields,
}
doc.Timestamp = ts
dpg.indexModeDocs = append(dpg.indexModeDocs, doc)
return dst, nil
}

dpt.dataPoints.tagFamilies = append(dpt.dataPoints.tagFamilies, tagFamily)
dpt.dataPoints.timestamps = append(dpt.dataPoints.timestamps, ts)
dpt.dataPoints.versions = append(dpt.dataPoints.versions, req.DataPoint.Version)
dpt.dataPoints.seriesIDs = append(dpt.dataPoints.seriesIDs, series.ID)

field := nameValues{}
for i := range stm.GetSchema().GetFields() {
var v *modelv1.FieldValue
Expand All @@ -125,8 +142,6 @@ func (w *writeCallback) handle(dst map[string]*dataPointsInGroup, writeEvent *me
))
}
dpt.dataPoints.fields = append(dpt.dataPoints.fields, field)
tagFamily, fields := w.handleTagFamily(stm, req)
dpt.dataPoints.tagFamilies = append(dpt.dataPoints.tagFamilies, tagFamily)

if stm.processorManager != nil {
stm.processorManager.onMeasureWrite(uint64(series.ID), &measurev1.InternalWriteRequest{
Expand All @@ -144,15 +159,14 @@ func (w *writeCallback) handle(dst map[string]*dataPointsInGroup, writeEvent *me
EntityValues: series.Buffer,
Fields: fields,
}
if stm.schema.IndexMode {
doc.Timestamp = ts
}
dpg.docs = append(dpg.docs, doc)
dpg.metadataDocs = append(dpg.metadataDocs, doc)

return dst, nil
}

func (w *writeCallback) newDpt(tsdb storage.TSDB[*tsTable, option], dpg *dataPointsInGroup, t time.Time, ts int64, shardID common.ShardID) (*dataPointsInTable, error) {
func (w *writeCallback) newDpt(tsdb storage.TSDB[*tsTable, option], dpg *dataPointsInGroup,
t time.Time, ts int64, shardID common.ShardID, indexMode bool,
) (*dataPointsInTable, error) {
var segment storage.Segment[*tsTable, option]
for _, seg := range dpg.segments {
if seg.GetTimeRange().Contains(ts) {
Expand All @@ -167,6 +181,12 @@ func (w *writeCallback) newDpt(tsdb storage.TSDB[*tsTable, option], dpg *dataPoi
}
dpg.segments = append(dpg.segments, segment)
}
if indexMode {
return &dataPointsInTable{
timeRange: segment.GetTimeRange(),
}, nil
}

tstb, err := segment.CreateTSTableIfNotExist(shardID)
if err != nil {
return nil, fmt.Errorf("cannot create ts table: %w", err)
Expand Down Expand Up @@ -295,11 +315,20 @@ func (w *writeCallback) Rev(_ context.Context, message bus.Message) (resp bus.Me
g := groups[i]
for j := range g.tables {
dps := g.tables[j]
dps.tsTable.mustAddDataPoints(&dps.dataPoints)
if dps.tsTable != nil {
dps.tsTable.mustAddDataPoints(&dps.dataPoints)
}
}
for _, segment := range g.segments {
if err := segment.IndexDB().Write(g.docs); err != nil {
w.l.Error().Err(err).Msg("cannot write index")
if len(g.metadataDocs) > 0 {
if err := segment.IndexDB().Insert(g.metadataDocs); err != nil {
w.l.Error().Err(err).Msg("cannot write metadata")
}
}
if len(g.indexModeDocs) > 0 {
if err := segment.IndexDB().Update(g.indexModeDocs); err != nil {
w.l.Error().Err(err).Msg("cannot write index")
}
}
segment.DecRef()
}
Expand Down
2 changes: 1 addition & 1 deletion banyand/stream/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ func write(b *testing.B, p parameter, esList []*elements, docsList []index.Docum
}
seg, err := db.CreateSegmentIfNotExist(time.Unix(0, esList[0].timestamps[0]))
require.NoError(b, err)
seg.IndexDB().Write(docs)
seg.IndexDB().Insert(docs)

tst, err := seg.CreateTSTableIfNotExist(common.ShardID(0))
require.NoError(b, err)
Expand Down
2 changes: 1 addition & 1 deletion banyand/stream/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ func (w *writeCallback) Rev(_ context.Context, message bus.Message) (resp bus.Me
}
if len(g.docs) > 0 {
for _, segment := range g.segments {
if err := segment.IndexDB().Write(g.docs); err != nil {
if err := segment.IndexDB().Insert(g.docs); err != nil {
w.l.Error().Err(err).Msg("cannot write index")
}
segment.DecRef()
Expand Down
3 changes: 2 additions & 1 deletion pkg/index/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,8 @@ type Batch struct {
// Writer allows writing fields and docID in a document to an index.
type Writer interface {
Batch(batch Batch) error
SeriesBatch(batch Batch) error
InsertSeriesBatch(batch Batch) error
UpdateSeriesBatch(batch Batch) error
}

// FieldIterable allows building a FieldIterator.
Expand Down
64 changes: 41 additions & 23 deletions pkg/index/inverted/inverted_series.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import (

var emptySeries = make([]index.SeriesDocument, 0)

func (s *store) SeriesBatch(batch index.Batch) error {
func (s *store) InsertSeriesBatch(batch index.Batch) error {
if len(batch.Documents) == 0 {
return nil
}
Expand All @@ -49,34 +49,52 @@ func (s *store) SeriesBatch(batch index.Batch) error {
b := generateBatch()
defer releaseBatch(b)
for _, d := range batch.Documents {
doc := bluge.NewDocument(convert.BytesToString(d.EntityValues))
for _, f := range d.Fields {
tf := bluge.NewKeywordFieldBytes(f.Key.Marshal(), f.Term)
if !f.Index {
tf.FieldOptions = 0
} else if !f.NoSort {
tf.Sortable()
}
doc := toDoc(d)
b.InsertIfAbsent(doc.ID(), doc)
}
return s.writer.Batch(b)
}

if f.Store {
tf.StoreValue()
}
if f.Key.Analyzer != index.AnalyzerUnspecified {
tf = tf.WithAnalyzer(Analyzers[f.Key.Analyzer])
}
doc.AddField(tf)
func (s *store) UpdateSeriesBatch(batch index.Batch) error {
if len(batch.Documents) == 0 {
return nil
}
if !s.closer.AddRunning() {
return nil
}
defer s.closer.Done()
b := generateBatch()
defer releaseBatch(b)
for _, d := range batch.Documents {
doc := toDoc(d)
b.Update(doc.ID(), doc)
}
return s.writer.Batch(b)
}

func toDoc(d index.Document) *bluge.Document {
doc := bluge.NewDocument(convert.BytesToString(d.EntityValues))
for _, f := range d.Fields {
tf := bluge.NewKeywordFieldBytes(f.Key.Marshal(), f.Term)
if !f.Index {
tf.FieldOptions = 0
} else if !f.NoSort {
tf.Sortable()
}

if d.Timestamp > 0 {
doc.AddField(bluge.NewDateTimeField(timestampField, time.Unix(0, d.Timestamp)).StoreValue())
if f.Store {
tf.StoreValue()
}
if len(d.Fields) == 0 {
b.InsertIfAbsent(doc.ID(), doc)
} else {
b.Update(doc.ID(), doc)
if f.Key.Analyzer != index.AnalyzerUnspecified {
tf = tf.WithAnalyzer(Analyzers[f.Key.Analyzer])
}
doc.AddField(tf)
}
return s.writer.Batch(b)

if d.Timestamp > 0 {
doc.AddField(bluge.NewDateTimeField(timestampField, time.Unix(0, d.Timestamp)).StoreValue())
}
return doc
}

// BuildQuery implements index.SeriesStore.
Expand Down
44 changes: 31 additions & 13 deletions pkg/index/inverted/inverted_series_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package inverted
import (
"context"
"maps"
"sort"
"testing"
"time"

Expand Down Expand Up @@ -63,7 +64,7 @@ func TestStore_Search(t *testing.T) {
}()

// Setup some data
setupData(tester, s)
insertData(tester, s)

// Test cases
tests := []struct {
Expand Down Expand Up @@ -203,6 +204,12 @@ func TestStore_Search(t *testing.T) {
require.NoError(t, err)
got, err := s.Search(context.Background(), tt.projection, query)
require.NoError(t, err)
sort.Slice(tt.want, func(i, j int) bool {
return string(tt.want[i].Key.EntityValues) < string(tt.want[j].Key.EntityValues)
})
sort.Slice(got, func(i, j int) bool {
return string(got[i].Key.EntityValues) < string(got[j].Key.EntityValues)
})
assert.Equal(t, tt.want, got)
})
}
Expand All @@ -222,7 +229,7 @@ func TestStore_SearchWildcard(t *testing.T) {
}()

// Setup some data
setupData(tester, s)
insertData(tester, s)

// Test cases
tests := []struct {
Expand Down Expand Up @@ -316,7 +323,7 @@ func TestStore_SearchPrefix(t *testing.T) {
}()

// Setup some data
setupData(tester, s)
insertData(tester, s)

// Test cases
tests := []struct {
Expand Down Expand Up @@ -389,7 +396,7 @@ func TestStore_SearchWithSecondaryQuery(t *testing.T) {
}()

// Setup some data
setupData(tester, s)
insertData(tester, s)

// Define the secondary query
secondaryQuery := &queryNode{
Expand Down Expand Up @@ -500,7 +507,7 @@ func TestStore_SeriesSort(t *testing.T) {
}()

// Setup some data
setupData(tester, s)
updateData(tester, s)

// Define the order by field
orderBy := &index.OrderBy{
Expand Down Expand Up @@ -729,7 +736,7 @@ func TestStore_TimestampSort(t *testing.T) {
}()

// Setup some data
setupData(tester, s)
updateData(tester, s)

// Define the order by field
orderBy := &index.OrderBy{
Expand Down Expand Up @@ -883,7 +890,13 @@ func TestStore_TimestampSort(t *testing.T) {
}
}

func setupData(tester *require.Assertions, s index.SeriesStore) {
func insertData(tester *require.Assertions, s index.SeriesStore) {
b1, b2 := generateDocs()
tester.NoError(s.InsertSeriesBatch(b1))
tester.NoError(s.InsertSeriesBatch(b2))
}

func generateDocs() (index.Batch, index.Batch) {
series1 := index.Document{
EntityValues: []byte("test1"),
}
Expand Down Expand Up @@ -965,10 +978,15 @@ func setupData(tester *require.Assertions, s index.SeriesStore) {
},
Timestamp: int64(2001),
}
tester.NoError(s.SeriesBatch(index.Batch{
Documents: []index.Document{series1, series2, series4, series3},
}))
tester.NoError(s.SeriesBatch(index.Batch{
Documents: []index.Document{series3},
}))
return index.Batch{
Documents: []index.Document{series1, series2, series4, series3},
}, index.Batch{
Documents: []index.Document{series3},
}
}

func updateData(tester *require.Assertions, s index.SeriesStore) {
b1, b2 := generateDocs()
tester.NoError(s.UpdateSeriesBatch(b1))
tester.NoError(s.UpdateSeriesBatch(b2))
}
Loading

0 comments on commit 05cf5e1

Please sign in to comment.