diff --git a/CHANGES.md b/CHANGES.md index bbdec150f..e537ba38d 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -10,6 +10,7 @@ Release Notes. - Implement Write-ahead Logging - Document the clustering. - Support multiple roles for banyand server. +- Support for recovery buffer using wal. ### Bugs diff --git a/api/common/id.go b/api/common/id.go index 0d8c58907..6cc879ca8 100644 --- a/api/common/id.go +++ b/api/common/id.go @@ -40,32 +40,6 @@ func (s SeriesID) Marshal() []byte { return convert.Uint64ToBytes(uint64(s)) } -// GlobalSeriesID identities a series in a shard. -type GlobalSeriesID struct { - Name string - SeriesID SeriesID -} - -// Marshal encodes global series id to bytes. -func (s GlobalSeriesID) Marshal() []byte { - seriesIDBytes := convert.Uint64ToBytes(uint64(s.SeriesID)) - nameBytes := []byte(s.Name) - return append(seriesIDBytes, nameBytes...) -} - -// Volume returns the estimated bytes volume of global series id. -func (s GlobalSeriesID) Volume() int { - return 8 + len(s.Name) -} - -// ParseGlobalSeriesID parses global series id from bytes. -func ParseGlobalSeriesID(b []byte) GlobalSeriesID { - return GlobalSeriesID{ - SeriesID: SeriesID(convert.BytesToUint64(b[:8])), - Name: string(b[8:]), - } -} - // positionKey is a context key to store the module position. var positionKey = contextPositionKey{} diff --git a/banyand/measure/tstable.go b/banyand/measure/tstable.go index 08765a5d1..462d5066b 100644 --- a/banyand/measure/tstable.go +++ b/banyand/measure/tstable.go @@ -41,6 +41,7 @@ import ( const ( defaultNumBufferShards = 2 defaultWriteConcurrency = 1000 + defaultWriteWal = true plain = "tst" encoded = "encoded" ) @@ -56,6 +57,7 @@ type tsTable struct { buffer *tsdb.Buffer closeBufferTimer *time.Timer position common.Position + path string bufferSize int64 encoderBufferSize int64 lock sync.Mutex @@ -72,13 +74,13 @@ func (t *tsTable) openBuffer() (err error) { return nil } bufferSize := int(t.encoderBufferSize / defaultNumBufferShards) - if t.encoderBuffer, err = tsdb.NewBuffer(t.l, t.position, bufferSize, - defaultWriteConcurrency, defaultNumBufferShards, t.encoderFlush); err != nil { + if t.encoderBuffer, err = tsdb.NewBufferWithWal(t.l, t.position, bufferSize, + defaultWriteConcurrency, defaultNumBufferShards, t.encoderFlush, defaultWriteWal, &t.path); err != nil { return fmt.Errorf("failed to create encoder buffer: %w", err) } bufferSize = int(t.bufferSize / defaultNumBufferShards) - if t.buffer, err = tsdb.NewBuffer(t.l, t.position, bufferSize, - defaultWriteConcurrency, defaultNumBufferShards, t.flush); err != nil { + if t.buffer, err = tsdb.NewBufferWithWal(t.l, t.position, bufferSize, + defaultWriteConcurrency, defaultNumBufferShards, t.flush, defaultWriteWal, &t.path); err != nil { return fmt.Errorf("failed to create buffer: %w", err) } end := t.EndTime() @@ -153,22 +155,19 @@ func (t *tsTable) Get(key []byte, ts time.Time) ([]byte, error) { func (t *tsTable) Put(key []byte, val []byte, ts time.Time) error { if t.encoderBuffer != nil { - t.writeToBuffer(key, val, ts) - return nil + return t.writeToBuffer(key, val, ts) } if err := t.openBuffer(); err != nil { return err } - t.writeToBuffer(key, val, ts) - return nil + return t.writeToBuffer(key, val, ts) } -func (t *tsTable) writeToBuffer(key []byte, val []byte, ts time.Time) { +func (t *tsTable) writeToBuffer(key []byte, val []byte, ts time.Time) error { if t.toEncode(key) { - t.encoderBuffer.Write(key, val, ts) - } else { - t.buffer.Write(key, val, ts) + return t.encoderBuffer.Write(key, val, ts) } + return t.buffer.Write(key, val, ts) } func (t *tsTable) encoderFlush(shardIndex int, skl *skl.Skiplist) error { @@ -228,6 +227,7 @@ func (ttf *tsTableFactory) NewTSTable(blockExpiryTracker tsdb.BlockExpiryTracker encoderSST: encoderSST, sst: sst, BlockExpiryTracker: &blockExpiryTracker, + path: root, } if table.IsActive() { if err := table.openBuffer(); err != nil { diff --git a/banyand/stream/tstable.go b/banyand/stream/tstable.go index 924f53f63..6af8bf2f4 100644 --- a/banyand/stream/tstable.go +++ b/banyand/stream/tstable.go @@ -113,15 +113,13 @@ func (t *tsTable) Get(key []byte, ts time.Time) ([]byte, error) { func (t *tsTable) Put(key []byte, val []byte, ts time.Time) error { if t.buffer != nil { - t.buffer.Write(key, val, ts) - return nil + return t.buffer.Write(key, val, ts) } if err := t.openBuffer(); err != nil { return err } - t.buffer.Write(key, val, ts) - return nil + return t.buffer.Write(key, val, ts) } func (t *tsTable) flush(shardIndex int, skl *skl.Skiplist) error { diff --git a/banyand/tsdb/buffer.go b/banyand/tsdb/buffer.go index c586659be..1fc94283c 100644 --- a/banyand/tsdb/buffer.go +++ b/banyand/tsdb/buffer.go @@ -25,6 +25,7 @@ import ( "github.com/dgraph-io/badger/v3/skl" "github.com/dgraph-io/badger/v3/y" + "github.com/pkg/errors" "github.com/apache/skywalking-banyandb/api/common" "github.com/apache/skywalking-banyandb/banyand/observability" @@ -32,11 +33,13 @@ import ( "github.com/apache/skywalking-banyandb/pkg/logger" "github.com/apache/skywalking-banyandb/pkg/meter" "github.com/apache/skywalking-banyandb/pkg/run" + "github.com/apache/skywalking-banyandb/pkg/wal" ) const ( - defaultSize = 1 << 20 // 1MB - nodeAlign = int(unsafe.Sizeof(uint64(0))) - 1 + defaultSize = 1 << 20 // 1MB + nodeAlign = int(unsafe.Sizeof(uint64(0))) - 1 + defaultWalSyncMode = false ) var ( @@ -53,13 +56,15 @@ func init() { } type operation struct { - key []byte - value []byte - epoch uint64 + recoveryDoneFn func() + key []byte + value []byte + epoch uint64 } type flushEvent struct { - data *skl.Skiplist + data *skl.Skiplist + walSegmentID wal.SegmentID } type onFlush func(shardIndex int, skl *skl.Skiplist) error @@ -71,12 +76,15 @@ type bufferShardBucket struct { writeWaitGroup *sync.WaitGroup flushWaitGroup *sync.WaitGroup log *logger.Logger + wal wal.WAL immutables []*skl.Skiplist labelValues []string shardLabelValues []string index int capacity int mutex sync.RWMutex + walSyncMode bool + enableWal bool } // Buffer is an exported struct that represents a buffer composed of multiple shard buckets. @@ -88,11 +96,18 @@ type Buffer struct { writeWaitGroup sync.WaitGroup flushWaitGroup sync.WaitGroup numShards int + enableWal bool closerOnce sync.Once } // NewBuffer creates a new Buffer instance with the given parameters. func NewBuffer(log *logger.Logger, position common.Position, flushSize, writeConcurrency, numShards int, onFlushFn onFlush) (*Buffer, error) { + return NewBufferWithWal(log, position, flushSize, writeConcurrency, numShards, onFlushFn, false, nil) +} + +// NewBufferWithWal creates a new Buffer instance with the given parameters. +func NewBufferWithWal(log *logger.Logger, position common.Position, flushSize, writeConcurrency, numShards int, onFlushFn onFlush, enableWal bool, walPath *string, +) (*Buffer, error) { buckets := make([]bufferShardBucket, numShards) buffer := &Buffer{ buckets: buckets, @@ -100,6 +115,7 @@ func NewBuffer(log *logger.Logger, position common.Position, flushSize, writeCon onFlushFn: onFlushFn, entryCloser: run.NewCloser(1), log: log.Named("buffer"), + enableWal: enableWal, } buffer.writeWaitGroup.Add(numShards) buffer.flushWaitGroup.Add(numShards) @@ -115,17 +131,27 @@ func NewBuffer(log *logger.Logger, position common.Position, flushSize, writeCon log: buffer.log.Named(fmt.Sprintf("shard-%d", i)), labelValues: append(position.LabelValues(), fmt.Sprintf("%d", i)), shardLabelValues: position.ShardLabelValues(), + enableWal: enableWal, } buckets[i].start(onFlushFn) + if enableWal { + if walPath == nil { + return nil, errors.New("wal path is required") + } + shardWalPath := fmt.Sprintf("%s/buffer-%d", *walPath, i) + if err := buckets[i].startWal(shardWalPath, defaultWalSyncMode); err != nil { + return nil, errors.Wrap(err, "failed to start wal") + } + } maxBytes.Set(float64(flushSize), buckets[i].labelValues...) } return buffer, nil } // Write adds a key-value pair with a timestamp to the appropriate shard bucket in the buffer. -func (b *Buffer) Write(key, value []byte, timestamp time.Time) { +func (b *Buffer) Write(key, value []byte, timestamp time.Time) error { if b == nil || !b.entryCloser.AddRunning() { - return + return errors.New("buffer is invalid") } defer b.entryCloser.Done() index := b.getShardIndex(key) @@ -133,7 +159,15 @@ func (b *Buffer) Write(key, value []byte, timestamp time.Time) { b.log.Debug().Uint64("shard", index).Bytes("key", key). Time("ts", timestamp).Msg("route a shard") } + + if b.enableWal { + if err := b.buckets[index].writeWal(key, value, timestamp); err != nil { + return errors.Wrap(err, "failed to write wal") + } + } + b.buckets[index].writeCh <- operation{key: key, value: value, epoch: uint64(timestamp.UnixNano())} + return nil } // Read retrieves the value associated with the given key and timestamp from the appropriate shard bucket in the buffer. @@ -179,6 +213,11 @@ func (b *Buffer) Close() error { } for i := 0; i < b.numShards; i++ { close(b.buckets[i].flushCh) + if b.enableWal { + if err := b.buckets[i].wal.Close(); err != nil { + b.buckets[i].log.Err(err).Msg("closing buffer shard wal failed") + } + } } b.flushWaitGroup.Wait() }) @@ -218,13 +257,24 @@ func (bsb *bufferShardBucket) start(onFlushFn onFlush) { for event := range bsb.flushCh { oldSkipList := event.data memSize := oldSkipList.MemSize() + onFlushFnDone := false t1 := time.Now() for { - if err := onFlushFn(bsb.index, oldSkipList); err != nil { - bsb.log.Err(err).Msg("flushing immutable buffer failed. Retrying...") - flushNum.Inc(1, append(bsb.labelValues[:2], "true")...) - time.Sleep(time.Second) - continue + if !onFlushFnDone { + if err := onFlushFn(bsb.index, oldSkipList); err != nil { + bsb.log.Err(err).Msg("flushing immutable buffer failed. Retrying...") + flushNum.Inc(1, append(bsb.labelValues[:2], "true")...) + time.Sleep(time.Second) + continue + } + onFlushFnDone = true + } + if bsb.enableWal { + if err := bsb.wal.Delete(event.walSegmentID); err != nil { + bsb.log.Err(err).Msg("delete wal segment file failed. Retrying...") + time.Sleep(time.Second) + continue + } } break } @@ -249,23 +299,38 @@ func (bsb *bufferShardBucket) start(onFlushFn onFlush) { volume += len(k) + int(v.EncodedSize()) + skl.MaxNodeSize + nodeAlign memSize := bsb.mutable.MemSize() mutableBytes.Set(float64(memSize), bsb.labelValues...) - if volume >= bsb.capacity || memSize >= int64(bsb.capacity) { - bsb.triggerFlushing() - volume = 0 + if op.recoveryDoneFn == nil && (volume >= bsb.capacity || memSize >= int64(bsb.capacity)) { + if err := bsb.triggerFlushing(); err != nil { + bsb.log.Err(err).Msg("triggering flushing failed") + } else { + volume = 0 + } } bsb.mutable.Put(k, v) + if bsb.enableWal && op.recoveryDoneFn != nil { + op.recoveryDoneFn() + } } }() } -func (bsb *bufferShardBucket) triggerFlushing() { +func (bsb *bufferShardBucket) triggerFlushing() error { + var walSegmentID wal.SegmentID + if bsb.enableWal { + segment, err := bsb.wal.Rotate() + if err != nil { + return errors.Wrap(err, "rotating wal failed") + } + walSegmentID = segment.GetSegmentID() + } + for { select { - case bsb.flushCh <- flushEvent{data: bsb.mutable}: + case bsb.flushCh <- flushEvent{data: bsb.mutable, walSegmentID: walSegmentID}: bsb.mutex.Lock() defer bsb.mutex.Unlock() bsb.swap() - return + return nil default: } time.Sleep(10 * time.Second) @@ -276,3 +341,107 @@ func (bsb *bufferShardBucket) swap() { bsb.immutables = append(bsb.immutables, bsb.mutable) bsb.mutable = skl.NewSkiplist(int64(bsb.capacity)) } + +func (bsb *bufferShardBucket) startWal(path string, syncMode bool) error { + wal, err := wal.New(path, wal.DefaultOptions) + if err != nil { + return errors.Wrap(err, fmt.Sprintf("failed to create wal: %s", path)) + } + bsb.wal = wal + bsb.walSyncMode = syncMode + bsb.log.Info().Msg(fmt.Sprintf( + "wal started with path: %s, sync mode: %v", path, syncMode)) + + if err = bsb.recoveryWal(); err != nil { + return errors.Wrap(err, "failed to recovery wal") + } + return nil +} + +func (bsb *bufferShardBucket) recoveryWal() error { + segments, err := bsb.wal.ReadAllSegments() + if err != nil { + return errors.Wrap(err, "failed to load wal segments") + } + + recoveredRecords := 0 + for index, segment := range segments { + recoveredRecords += len(segment.GetLogEntries()) + isWorkSegment := index == len(segments)-1 + if isWorkSegment { + bsb.recoveryWorkSegment(segment) + } else { + bsb.recoveryStableSegment(segment) + } + bsb.log.Info().Msg(fmt.Sprintf( + "recovered %d log records from wal segment %d", + len(segment.GetLogEntries()), + segment.GetSegmentID())) + } + bsb.log.Info().Msg(fmt.Sprintf( + "recovered %d log records from wal", recoveredRecords)) + return nil +} + +func (bsb *bufferShardBucket) recoveryWorkSegment(segment wal.Segment) { + var wg sync.WaitGroup + wg.Add(len(segment.GetLogEntries())) + for _, logEntry := range segment.GetLogEntries() { + timestamps := logEntry.GetTimestamps() + values := logEntry.GetValues() + elementIndex := 0 + for element := values.Front(); element != nil; element = element.Next() { + timestamp := timestamps[elementIndex] + bsb.writeCh <- operation{ + key: logEntry.GetSeriesID(), + value: element.Value.([]byte), + epoch: uint64(timestamp.UnixNano()), + recoveryDoneFn: func() { + wg.Done() + if bsb.log.Trace().Enabled() { + bsb.log.Trace().Msg(fmt.Sprintf("recovered key: %v, ts: %v", + logEntry.GetSeriesID(), timestamp.UnixNano())) + } + }, + } + elementIndex++ + } + } + wg.Wait() +} + +func (bsb *bufferShardBucket) recoveryStableSegment(segment wal.Segment) { + for _, logEntries := range segment.GetLogEntries() { + timestamps := logEntries.GetTimestamps() + values := logEntries.GetValues() + elementIndex := 0 + for element := values.Front(); element != nil; element = element.Next() { + timestamp := timestamps[elementIndex] + k := y.KeyWithTs(logEntries.GetSeriesID(), uint64(timestamp.UnixNano())) + v := y.ValueStruct{Value: element.Value.([]byte)} + bsb.mutable.Put(k, v) + elementIndex++ + } + } + bsb.flushCh <- flushEvent{data: bsb.mutable, walSegmentID: segment.GetSegmentID()} + // Sync recover data to immutables + bsb.swap() +} + +func (bsb *bufferShardBucket) writeWal(key, value []byte, timestamp time.Time) error { + if !bsb.walSyncMode { + bsb.wal.Write(key, timestamp, value, nil) + return nil + } + + var walErr error + var wg sync.WaitGroup + wg.Add(1) + walCallback := func(key []byte, t time.Time, value []byte, err error) { + walErr = err + wg.Done() + } + bsb.wal.Write(key, timestamp, value, walCallback) + wg.Wait() + return walErr +} diff --git a/banyand/tsdb/buffer_test.go b/banyand/tsdb/buffer_test.go index bb9483720..7068e9724 100644 --- a/banyand/tsdb/buffer_test.go +++ b/banyand/tsdb/buffer_test.go @@ -21,6 +21,9 @@ import ( "crypto/rand" "fmt" "math/big" + "os" + "path/filepath" + "strconv" "sync" "time" @@ -118,7 +121,8 @@ var _ = Describe("Buffer", func() { }(ch) } - buffer, err := tsdb.NewBuffer(log, common.Position{}, 1024, 16, numShards, onFlushFn) + var err error + buffer, err = tsdb.NewBuffer(log, common.Position{}, 1024, 16, numShards, onFlushFn) defer func() { _ = buffer.Close() }() @@ -147,4 +151,153 @@ var _ = Describe("Buffer", func() { } }) }) + + Context("Write and Recover of wal correctly", func() { + writeConcurrency := 2 + numShards := 2 + flushSize := 1024 + baseTime := time.Now() + var path string + + BeforeEach(func() { + var err error + path, err = os.MkdirTemp("", "banyandb-test-buffer-wal-*") + Expect(err).ToNot(HaveOccurred()) + }) + + AfterEach(func() { + err := os.RemoveAll(path) + Expect(err).ToNot(HaveOccurred()) + }) + + It("should write and rotate wal file correctly", func() { + var err error + var flushMutex sync.Mutex + + shardWalFileHistory := make(map[int][]string) + buffer, err = tsdb.NewBufferWithWal( + log, + common.Position{}, + flushSize, + writeConcurrency, + numShards, + func(shardIndex int, skl *skl.Skiplist) error { + flushMutex.Lock() + defer flushMutex.Unlock() + + shardWalDir := filepath.Join(path, "buffer-"+strconv.Itoa(shardIndex)) + var shardWalList []os.DirEntry + shardWalList, err = os.ReadDir(shardWalDir) + Expect(err).ToNot(HaveOccurred()) + for _, shardWalFile := range shardWalList { + Expect(shardWalFile.IsDir()).To(BeFalse()) + Expect(shardWalFile.Name()).To(HaveSuffix(".wal")) + shardWalFileHistory[shardIndex] = append(shardWalFileHistory[shardIndex], shardWalFile.Name()) + } + return nil + }, + true, + &path) + Expect(err).ToNot(HaveOccurred()) + defer buffer.Close() + + // Write buffer & wal + var wg sync.WaitGroup + wg.Add(writeConcurrency) + for i := 0; i < writeConcurrency; i++ { + go func(writerIndex int) { + for j := 0; j < numShards; j++ { + for k := 0; k < flushSize; k++ { + buffer.Write( + []byte(fmt.Sprintf("writer-%d-shard-%d-key-%d", writerIndex, j, k)), + []byte(fmt.Sprintf("writer-%d-shard-%d-value-%d", writerIndex, j, k)), + time.UnixMilli(baseTime.UnixMilli()+int64(writerIndex+j+k))) + } + } + wg.Done() + }(i) + } + wg.Wait() + + flushMutex.Lock() + defer flushMutex.Unlock() + + // Check wal + Expect(len(shardWalFileHistory) == numShards).To(BeTrue()) + for shardIndex := 0; shardIndex < numShards; shardIndex++ { + // Check wal rotate + Expect(len(shardWalFileHistory[shardIndex]) > 1).To(BeTrue()) + + shardWalDir := filepath.Join(path, "buffer-"+strconv.Itoa(shardIndex)) + currentShardWalFiles, err := os.ReadDir(shardWalDir) + Expect(err).ToNot(HaveOccurred()) + Expect(len(currentShardWalFiles) <= 2).To(BeTrue()) + // Check wal delete + Expect(len(shardWalFileHistory[shardIndex]) > len(currentShardWalFiles)).To(BeTrue()) + } + }) + + It("should recover buffer from wal file correctly", func() { + var err error + var flushMutex sync.Mutex + var bufferFlushed bool + + buffer, err = tsdb.NewBufferWithWal( + log, + common.Position{}, + flushSize, + writeConcurrency, + numShards, + func(shardIndex int, skl *skl.Skiplist) error { + flushMutex.Lock() + defer flushMutex.Unlock() + + if !bufferFlushed { + bufferFlushed = true + } + return nil + }, + true, + &path) + Expect(err).ToNot(HaveOccurred()) + + // Write buffer & wal + for i := 0; i < numShards; i++ { + buffer.Write( + []byte(fmt.Sprintf("shard-%d-key-1", i)), + []byte(fmt.Sprintf("shard-%d-value-1", i)), + time.UnixMilli(baseTime.UnixMilli()+int64(i))) + } + + flushMutex.Lock() + Expect(bufferFlushed).To(BeFalse()) + flushMutex.Unlock() + + // Restart buffer + buffer.Close() + buffer, err = tsdb.NewBufferWithWal( + log, + common.Position{}, + flushSize, + writeConcurrency, + numShards, + func(shardIndex int, skl *skl.Skiplist) error { + return nil + }, + true, + &path) + Expect(err).ToNot(HaveOccurred()) + defer buffer.Close() + + // Check buffer was recovered from wal + for i := 0; i < numShards; i++ { + expectValue := []byte(fmt.Sprintf("shard-%d-value-1", i)) + value, exist := buffer.Read( + []byte(fmt.Sprintf("shard-%d-key-1", i)), + time.UnixMilli(baseTime.UnixMilli()+int64(i))) + Expect(exist).To(BeTrue()) + Expect(bytes.Equal(expectValue, value)).To(BeTrue()) + } + }) + }) }) diff --git a/pkg/convert/string.go b/pkg/convert/string.go new file mode 100644 index 000000000..5a24db8d4 --- /dev/null +++ b/pkg/convert/string.go @@ -0,0 +1,38 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package convert + +import ( + "reflect" + "unsafe" +) + +// StringToBytes converts string to bytes. +func StringToBytes(s string) (b []byte) { + bh := (*reflect.SliceHeader)(unsafe.Pointer(&b)) + sh := (*reflect.StringHeader)(unsafe.Pointer(&s)) + bh.Data = sh.Data + bh.Cap = sh.Len + bh.Len = sh.Len + return b +} + +// BytesToString converts bytes to string. +func BytesToString(b []byte) string { + return *(*string)(unsafe.Pointer(&b)) +} diff --git a/pkg/wal/wal.go b/pkg/wal/wal.go index 39b022828..db8f416d6 100644 --- a/pkg/wal/wal.go +++ b/pkg/wal/wal.go @@ -26,6 +26,7 @@ import ( "math" "os" "path/filepath" + "sort" "strconv" "strings" "sync" @@ -35,7 +36,7 @@ import ( "github.com/pkg/errors" "go.uber.org/multierr" - "github.com/apache/skywalking-banyandb/api/common" + "github.com/apache/skywalking-banyandb/pkg/convert" "github.com/apache/skywalking-banyandb/pkg/encoding" "github.com/apache/skywalking-banyandb/pkg/logger" "github.com/apache/skywalking-banyandb/pkg/run" @@ -55,6 +56,7 @@ const ( parseTimeStr = "2006-01-02 15:04:05" maxRetries = 3 maxSegmentID = uint64(math.MaxUint64) - 1 + defaultSyncFlush = false ) // DefaultOptions for Open(). @@ -62,7 +64,7 @@ var DefaultOptions = &Options{ FileSize: 67108864, // 64MB BufferSize: 65535, // 16KB BufferBatchInterval: 3 * time.Second, - NoSync: false, + SyncFlush: defaultSyncFlush, } // Options for creating Write-ahead Logging. @@ -71,7 +73,7 @@ type Options struct { BufferSize int BufferBatchInterval time.Duration FlushQueueSize int - NoSync bool + SyncFlush bool } // WAL denotes a Write-ahead logging. @@ -82,7 +84,7 @@ type WAL interface { // Write a logging entity. // It will return immediately when the data is written in the buffer, // The callback function will be called when the entity is flushed on the persistent storage. - Write(seriesID common.GlobalSeriesID, timestamp time.Time, data []byte, callback func(common.GlobalSeriesID, time.Time, []byte, error)) + Write(seriesID []byte, timestamp time.Time, data []byte, callback func([]byte, time.Time, []byte, error)) // Read specified segment by SegmentID. Read(segmentID SegmentID) (Segment, error) // ReadAllSegments reads all segments sorted by their creation time in ascending order. @@ -106,7 +108,7 @@ type Segment interface { // LogEntry used for attain detail value of WAL entry. type LogEntry interface { - GetSeriesID() common.GlobalSeriesID + GetSeriesID() []byte GetTimestamps() []time.Time GetValues() *list.List } @@ -137,35 +139,56 @@ type segment struct { } type logRequest struct { - seriesID common.GlobalSeriesID + seriesID []byte timestamp time.Time - callback func(common.GlobalSeriesID, time.Time, []byte, error) + callback func([]byte, time.Time, []byte, error) data []byte } type logEntry struct { timestamps []time.Time values *list.List - seriesID common.GlobalSeriesID + seriesID []byte entryLength uint64 count uint32 } type buffer struct { - timestampMap map[common.GlobalSeriesID][]time.Time - valueMap map[common.GlobalSeriesID][]byte - callbackMap map[common.GlobalSeriesID][]func(common.GlobalSeriesID, time.Time, []byte, error) + timestampMap map[logSeriesID][]time.Time + valueMap map[logSeriesID][]byte + callbackMap map[logSeriesID][]func([]byte, time.Time, []byte, error) count int } type bufferWriter struct { buf *bytes.Buffer - seriesIDBuf *bytes.Buffer timestampsBuf *bytes.Buffer + seriesID *logSeriesID dataBuf []byte - dataLen int - seriesCount uint32 batchLen uint64 + seriesCount uint32 + dataLen int +} + +type logSeriesID struct { + key string + byteLen int +} + +func newLogSeriesID(b []byte) logSeriesID { + return logSeriesID{key: convert.BytesToString(b), byteLen: len(b)} +} + +func (s logSeriesID) string() string { + return s.key +} + +func (s logSeriesID) bytes() []byte { + return convert.StringToBytes(s.key) +} + +func (s logSeriesID) len() int { + return s.byteLen } // New creates a WAL instance in the specified path. @@ -189,7 +212,7 @@ func New(path string, options *Options) (WAL, error) { FileSize: fileSize, BufferSize: bufferSize, BufferBatchInterval: bufferBatchInterval, - NoSync: options.NoSync, + SyncFlush: options.SyncFlush, } } @@ -216,9 +239,9 @@ func New(path string, options *Options) (WAL, error) { flushCloser: flushCloser, chanGroupCloser: chanGroupCloser, buffer: buffer{ - timestampMap: make(map[common.GlobalSeriesID][]time.Time), - valueMap: make(map[common.GlobalSeriesID][]byte), - callbackMap: make(map[common.GlobalSeriesID][]func(common.GlobalSeriesID, time.Time, []byte, error)), + timestampMap: make(map[logSeriesID][]time.Time), + valueMap: make(map[logSeriesID][]byte), + callbackMap: make(map[logSeriesID][]func([]byte, time.Time, []byte, error)), count: 0, }, } @@ -234,7 +257,7 @@ func New(path string, options *Options) (WAL, error) { // Write a logging entity. // It will return immediately when the data is written in the buffer, // The callback function will be called when the entity is flushed on the persistent storage. -func (log *log) Write(seriesID common.GlobalSeriesID, timestamp time.Time, data []byte, callback func(common.GlobalSeriesID, time.Time, []byte, error)) { +func (log *log) Write(seriesID []byte, timestamp time.Time, data []byte, callback func([]byte, time.Time, []byte, error)) { if !log.writeCloser.AddSender() { return } @@ -266,6 +289,7 @@ func (log *log) ReadAllSegments() ([]Segment, error) { for _, segment := range log.segmentMap { segments = append(segments, segment) } + sort.Slice(segments, func(i, j int) bool { return segments[i].GetSegmentID() < segments[j].GetSegmentID() }) return segments, nil } @@ -365,7 +389,7 @@ func (log *log) start() { log.logger.Debug().Msg("Write request to buffer. elements: " + strconv.Itoa(log.buffer.count)) } - bufferVolume += request.seriesID.Volume() + timestampVolumeLength + len(request.data) + bufferVolume += len(request.seriesID) + timestampVolumeLength + len(request.data) if bufferVolume > log.options.BufferSize { log.triggerFlushing() bufferVolume = 0 @@ -440,9 +464,9 @@ func (log *log) triggerFlushing() { func (log *log) newBuffer() { log.buffer = buffer{ - timestampMap: make(map[common.GlobalSeriesID][]time.Time), - valueMap: make(map[common.GlobalSeriesID][]byte), - callbackMap: make(map[common.GlobalSeriesID][]func(common.GlobalSeriesID, time.Time, []byte, error)), + timestampMap: make(map[logSeriesID][]time.Time), + valueMap: make(map[logSeriesID][]byte), + callbackMap: make(map[logSeriesID][]func([]byte, time.Time, []byte, error)), count: 0, } } @@ -481,7 +505,7 @@ func (log *log) writeWorkSegment(data []byte) error { if _, err := log.workSegment.file.Write(data); err != nil { return errors.Wrap(err, "Write WAL segment file error, file: "+log.workSegment.path) } - if !log.options.NoSync { + if log.options.SyncFlush { if err := log.workSegment.file.Sync(); err != nil { log.logger.Warn().Msg("Sync WAL segment file to disk failed, file: " + log.workSegment.path) } @@ -541,7 +565,6 @@ func (log *log) load() error { func newBufferWriter() *bufferWriter { return &bufferWriter{ buf: bytes.NewBuffer([]byte{}), - seriesIDBuf: bytes.NewBuffer([]byte{}), timestampsBuf: bytes.NewBuffer([]byte{}), dataBuf: make([]byte, 128), } @@ -558,14 +581,14 @@ func (w *bufferWriter) Reset() error { } func (w *bufferWriter) ResetSeries() { - w.seriesIDBuf.Reset() w.timestampsBuf.Reset() w.dataLen = 0 + w.seriesID = nil w.seriesCount = 0 } func (w *bufferWriter) AddSeries() error { - seriesIDBytesLen := uint16(w.seriesIDBuf.Len()) + seriesIDBytesLen := uint16(w.seriesID.len()) timestampsBytesLen := uint16(w.timestampsBuf.Len()) entryLen := seriesIDLength + uint64(seriesIDBytesLen) + seriesCountLength + timestampsBinaryLength + uint64(timestampsBytesLen) + uint64(w.dataLen) @@ -576,7 +599,7 @@ func (w *bufferWriter) AddSeries() error { if err = w.writeSeriesIDLength(seriesIDBytesLen); err != nil { return err } - if err = w.writeSeriesID(w.seriesIDBuf.Bytes()); err != nil { + if err = w.writeSeriesID(w.seriesID); err != nil { return err } if err = w.writeSeriesCount(w.seriesCount); err != nil { @@ -601,13 +624,8 @@ func (w *bufferWriter) Bytes() []byte { return w.rewriteBatchLength(batchBytes, batchLen) } -func (w *bufferWriter) WriteSeriesID(s common.GlobalSeriesID) error { - if err := writeUint64(w.seriesIDBuf, uint64(s.SeriesID)); err != nil { - return err - } - if _, err := w.seriesIDBuf.WriteString(s.Name); err != nil { - return err - } +func (w *bufferWriter) WriteSeriesID(seriesID logSeriesID) error { + w.seriesID = &seriesID return nil } @@ -661,8 +679,8 @@ func (w *bufferWriter) writeSeriesIDLength(data uint16) error { return writeUint16(w.buf, data) } -func (w *bufferWriter) writeSeriesID(data []byte) error { - _, err := w.buf.Write(data) +func (w *bufferWriter) writeSeriesID(data *logSeriesID) error { + _, err := w.buf.WriteString(data.string()) return err } @@ -713,7 +731,7 @@ func (segment *segment) parseLogEntries() error { var batchLen uint64 var entryLen uint64 var seriesIDLen uint16 - var seriesID common.GlobalSeriesID + var seriesID []byte var seriesCount uint32 var timestampsBinaryLen uint16 var entryEndPosition uint64 @@ -853,11 +871,8 @@ func (segment *segment) parseSeriesIDLength(data []byte) (uint16, error) { return seriesIDLen, nil } -func (segment *segment) parseSeriesID(data []byte) common.GlobalSeriesID { - return common.GlobalSeriesID{ - SeriesID: common.SeriesID(bytesToUint64(data[:8])), - Name: string(data[8:]), - } +func (segment *segment) parseSeriesID(data []byte) []byte { + return newLogSeriesID(data).bytes() } func (segment *segment) parseSeriesCountLength(data []byte) (uint32, error) { @@ -910,7 +925,7 @@ func (segment *segment) parseValuesBinary(data []byte) (*list.List, error) { return values, nil } -func (logEntry *logEntry) GetSeriesID() common.GlobalSeriesID { +func (logEntry *logEntry) GetSeriesID() []byte { return logEntry.seriesID } @@ -923,15 +938,15 @@ func (logEntry *logEntry) GetValues() *list.List { } func (buffer *buffer) write(request logRequest) { - seriesID := request.seriesID - buffer.timestampMap[seriesID] = append(buffer.timestampMap[seriesID], request.timestamp) + key := newLogSeriesID(request.seriesID) + buffer.timestampMap[key] = append(buffer.timestampMap[key], request.timestamp) // Value item: binary-length(2-bytes) + binary data(n-bytes) binaryLen := uint16(len(request.data)) - buffer.valueMap[seriesID] = append(buffer.valueMap[seriesID], byte(binaryLen), byte(binaryLen>>8)) - buffer.valueMap[seriesID] = append(buffer.valueMap[seriesID], request.data...) + buffer.valueMap[key] = append(buffer.valueMap[key], byte(binaryLen), byte(binaryLen>>8)) + buffer.valueMap[key] = append(buffer.valueMap[key], request.data...) - buffer.callbackMap[seriesID] = append(buffer.callbackMap[seriesID], request.callback) + buffer.callbackMap[key] = append(buffer.callbackMap[key], request.callback) buffer.count++ } @@ -946,9 +961,11 @@ func (buffer *buffer) notifyRequests(err error) { valuePos = 0 for index, callback := range callbacks { valuePos, valueItem = readValuesBinary(values, valuePos, valuesBinaryLength) - buffer.runningCallback(func() { - callback(seriesID, timestamps[index], valueItem, err) - }) + if callback != nil { + buffer.runningCallback(func() { + callback(seriesID.bytes(), timestamps[index], valueItem, err) + }) + } } } } @@ -973,7 +990,7 @@ func parseSegmentID(segmentName string) (uint64, error) { if !strings.HasSuffix(segmentName, segmentNameSuffix) { return 0, errors.New("Invalid segment name: " + segmentName) } - return strconv.ParseUint(segmentName[3:19], 10, 64) + return strconv.ParseUint(segmentName[3:19], 16, 64) } func readValuesBinary(raw []byte, position int, offsetLen int) (int, []byte) { @@ -1051,10 +1068,6 @@ func bytesToUint16(buf []byte) uint16 { return binary.LittleEndian.Uint16(buf) } -func bytesToUint64(buf []byte) uint64 { - return binary.LittleEndian.Uint64(buf) -} - func timeToUnixNano(time time.Time) uint64 { return uint64(time.UnixNano()) } diff --git a/pkg/wal/wal_benchmark_test.go b/pkg/wal/wal_benchmark_test.go index d6eccc396..ba9570f23 100644 --- a/pkg/wal/wal_benchmark_test.go +++ b/pkg/wal/wal_benchmark_test.go @@ -27,7 +27,6 @@ import ( "testing" "time" - "github.com/apache/skywalking-banyandb/api/common" "github.com/apache/skywalking-banyandb/pkg/logger" ) @@ -41,11 +40,11 @@ var ( seriesID100 = newSeriesIDList(100) seriesID500 = newSeriesIDList(500) seriesID1000 = newSeriesIDList(1000) - callback = func(seriesID common.GlobalSeriesID, t time.Time, bytes []byte, err error) {} + callback = func(seriesID []byte, t time.Time, bytes []byte, err error) {} ) func Benchmark_SeriesID_1(b *testing.B) { - wal := newWAL(nil) + wal := newWAL(&Options{SyncFlush: true}) defer closeWAL(wal) seriesID := seriesID1 @@ -53,13 +52,13 @@ func Benchmark_SeriesID_1(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { - wal.Write(seriesID[i%seriesIDLen], time.UnixMilli(baseTime+1), data[i%dataLen].binary, callback) + wal.Write(seriesID[i%seriesIDLen].key, time.UnixMilli(baseTime+1), data[i%dataLen].binary, callback) } b.StopTimer() } func Benchmark_SeriesID_20(b *testing.B) { - wal := newWAL(nil) + wal := newWAL(&Options{SyncFlush: true}) defer closeWAL(wal) seriesID := seriesID20 @@ -67,13 +66,13 @@ func Benchmark_SeriesID_20(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { - wal.Write(seriesID[i%seriesIDLen], time.UnixMilli(baseTime+1), data[i%dataLen].binary, callback) + wal.Write(seriesID[i%seriesIDLen].key, time.UnixMilli(baseTime+1), data[i%dataLen].binary, callback) } b.StopTimer() } func Benchmark_SeriesID_100(b *testing.B) { - wal := newWAL(nil) + wal := newWAL(&Options{SyncFlush: true}) defer closeWAL(wal) seriesID := seriesID100 @@ -81,13 +80,13 @@ func Benchmark_SeriesID_100(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { - wal.Write(seriesID[i%seriesIDLen], time.UnixMilli(baseTime+1), data[i%dataLen].binary, callback) + wal.Write(seriesID[i%seriesIDLen].key, time.UnixMilli(baseTime+1), data[i%dataLen].binary, callback) } b.StopTimer() } func Benchmark_SeriesID_500(b *testing.B) { - wal := newWAL(nil) + wal := newWAL(&Options{SyncFlush: true}) defer closeWAL(wal) seriesID := seriesID500 @@ -95,13 +94,13 @@ func Benchmark_SeriesID_500(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { - wal.Write(seriesID[i%seriesIDLen], time.UnixMilli(baseTime+1), data[i%dataLen].binary, callback) + wal.Write(seriesID[i%seriesIDLen].key, time.UnixMilli(baseTime+1), data[i%dataLen].binary, callback) } b.StopTimer() } func Benchmark_SeriesID_1000(b *testing.B) { - wal := newWAL(nil) + wal := newWAL(&Options{SyncFlush: true}) defer closeWAL(wal) seriesID := seriesID1000 @@ -109,13 +108,13 @@ func Benchmark_SeriesID_1000(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { - wal.Write(seriesID[i%seriesIDLen], time.UnixMilli(baseTime+1), data[i%dataLen].binary, callback) + wal.Write(seriesID[i%seriesIDLen].key, time.UnixMilli(baseTime+1), data[i%dataLen].binary, callback) } b.StopTimer() } func Benchmark_SeriesID_1000_Buffer_64K(b *testing.B) { - wal := newWAL(&Options{BufferSize: 1024 * 64}) + wal := newWAL(&Options{SyncFlush: true, BufferSize: 1024 * 64}) defer closeWAL(wal) seriesID := seriesID1000 @@ -123,13 +122,13 @@ func Benchmark_SeriesID_1000_Buffer_64K(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { - wal.Write(seriesID[i%seriesIDLen], time.UnixMilli(baseTime+1), data[i%dataLen].binary, callback) + wal.Write(seriesID[i%seriesIDLen].key, time.UnixMilli(baseTime+1), data[i%dataLen].binary, callback) } b.StopTimer() } func Benchmark_SeriesID_1000_Buffer_128K(b *testing.B) { - wal := newWAL(&Options{BufferSize: 1024 * 128}) + wal := newWAL(&Options{SyncFlush: true, BufferSize: 1024 * 128}) defer closeWAL(wal) seriesID := seriesID1000 @@ -137,13 +136,13 @@ func Benchmark_SeriesID_1000_Buffer_128K(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { - wal.Write(seriesID[i%seriesIDLen], time.UnixMilli(baseTime+1), data[i%dataLen].binary, callback) + wal.Write(seriesID[i%seriesIDLen].key, time.UnixMilli(baseTime+1), data[i%dataLen].binary, callback) } b.StopTimer() } func Benchmark_SeriesID_1000_Buffer_512K(b *testing.B) { - wal := newWAL(&Options{BufferSize: 1024 * 512}) + wal := newWAL(&Options{SyncFlush: true, BufferSize: 1024 * 512}) defer closeWAL(wal) seriesID := seriesID1000 @@ -151,13 +150,13 @@ func Benchmark_SeriesID_1000_Buffer_512K(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { - wal.Write(seriesID[i%seriesIDLen], time.UnixMilli(baseTime+1), data[i%dataLen].binary, callback) + wal.Write(seriesID[i%seriesIDLen].key, time.UnixMilli(baseTime+1), data[i%dataLen].binary, callback) } b.StopTimer() } func Benchmark_SeriesID_1000_Buffer_1MB(b *testing.B) { - wal := newWAL(&Options{BufferSize: 1024 * 1024}) + wal := newWAL(&Options{SyncFlush: true, BufferSize: 1024 * 1024}) defer closeWAL(wal) seriesID := seriesID1000 @@ -165,13 +164,13 @@ func Benchmark_SeriesID_1000_Buffer_1MB(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { - wal.Write(seriesID[i%seriesIDLen], time.UnixMilli(baseTime+1), data[i%dataLen].binary, callback) + wal.Write(seriesID[i%seriesIDLen].key, time.UnixMilli(baseTime+1), data[i%dataLen].binary, callback) } b.StopTimer() } func Benchmark_SeriesID_1000_Buffer_2MB(b *testing.B) { - wal := newWAL(&Options{BufferSize: 1024 * 1024 * 2}) + wal := newWAL(&Options{SyncFlush: true, BufferSize: 1024 * 1024 * 2}) defer closeWAL(wal) seriesID := seriesID1000 @@ -179,13 +178,13 @@ func Benchmark_SeriesID_1000_Buffer_2MB(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { - wal.Write(seriesID[i%seriesIDLen], time.UnixMilli(baseTime+1), data[i%dataLen].binary, callback) + wal.Write(seriesID[i%seriesIDLen].key, time.UnixMilli(baseTime+1), data[i%dataLen].binary, callback) } b.StopTimer() } func Benchmark_SeriesID_1000_Buffer_64K_NoSyncFlush(b *testing.B) { - wal := newWAL(&Options{BufferSize: 1024 * 64, NoSync: true}) + wal := newWAL(&Options{BufferSize: 1024 * 64, SyncFlush: false}) defer closeWAL(wal) seriesID := seriesID1000 @@ -193,13 +192,13 @@ func Benchmark_SeriesID_1000_Buffer_64K_NoSyncFlush(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { - wal.Write(seriesID[i%seriesIDLen], time.UnixMilli(baseTime+1), data[i%dataLen].binary, callback) + wal.Write(seriesID[i%seriesIDLen].key, time.UnixMilli(baseTime+1), data[i%dataLen].binary, callback) } b.StopTimer() } func Benchmark_SeriesID_1000_Buffer_128K_NoSyncFlush(b *testing.B) { - wal := newWAL(&Options{BufferSize: 1024 * 128, NoSync: true}) + wal := newWAL(&Options{BufferSize: 1024 * 128, SyncFlush: false}) defer closeWAL(wal) seriesID := seriesID1000 @@ -207,13 +206,13 @@ func Benchmark_SeriesID_1000_Buffer_128K_NoSyncFlush(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { - wal.Write(seriesID[i%seriesIDLen], time.UnixMilli(baseTime+1), data[i%dataLen].binary, callback) + wal.Write(seriesID[i%seriesIDLen].key, time.UnixMilli(baseTime+1), data[i%dataLen].binary, callback) } b.StopTimer() } func Benchmark_SeriesID_1000_Buffer_512K_NoSyncFlush(b *testing.B) { - wal := newWAL(&Options{BufferSize: 1024 * 512, NoSync: true}) + wal := newWAL(&Options{BufferSize: 1024 * 512, SyncFlush: false}) defer closeWAL(wal) seriesID := seriesID1000 @@ -221,13 +220,13 @@ func Benchmark_SeriesID_1000_Buffer_512K_NoSyncFlush(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { - wal.Write(seriesID[i%seriesIDLen], time.UnixMilli(baseTime+1), data[i%dataLen].binary, callback) + wal.Write(seriesID[i%seriesIDLen].key, time.UnixMilli(baseTime+1), data[i%dataLen].binary, callback) } b.StopTimer() } func Benchmark_SeriesID_1000_Buffer_1MB_NoSyncFlush(b *testing.B) { - wal := newWAL(&Options{BufferSize: 1024 * 1024, NoSync: true}) + wal := newWAL(&Options{BufferSize: 1024 * 1024, SyncFlush: false}) defer closeWAL(wal) seriesID := seriesID1000 @@ -235,13 +234,13 @@ func Benchmark_SeriesID_1000_Buffer_1MB_NoSyncFlush(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { - wal.Write(seriesID[i%seriesIDLen], time.UnixMilli(baseTime+1), data[i%dataLen].binary, callback) + wal.Write(seriesID[i%seriesIDLen].key, time.UnixMilli(baseTime+1), data[i%dataLen].binary, callback) } b.StopTimer() } func Benchmark_SeriesID_1000_Buffer_2MB_NoSyncFlush(b *testing.B) { - wal := newWAL(&Options{BufferSize: 1024 * 1024 * 2, NoSync: true}) + wal := newWAL(&Options{BufferSize: 1024 * 1024 * 2, SyncFlush: false}) defer closeWAL(wal) seriesID := seriesID1000 @@ -249,13 +248,13 @@ func Benchmark_SeriesID_1000_Buffer_2MB_NoSyncFlush(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { - wal.Write(seriesID[i%seriesIDLen], time.UnixMilli(baseTime+1), data[i%dataLen].binary, callback) + wal.Write(seriesID[i%seriesIDLen].key, time.UnixMilli(baseTime+1), data[i%dataLen].binary, callback) } b.StopTimer() } func Benchmark_SeriesID_1000_Buffer_64K_NoSyncFlush_And_Rotate_16MB(b *testing.B) { - wal := newWAL(&Options{BufferSize: 1024 * 64, NoSync: true}) + wal := newWAL(&Options{BufferSize: 1024 * 64, SyncFlush: false}) defer closeWAL(wal) seriesID := seriesID1000 @@ -281,7 +280,7 @@ func Benchmark_SeriesID_1000_Buffer_64K_NoSyncFlush_And_Rotate_16MB(b *testing.B }() for i := 0; i < b.N; i++ { binaryData = data[i%dataLen].binary - wal.Write(seriesID[i%seriesIDLen], time.UnixMilli(baseTime+1), binaryData, callback) + wal.Write(seriesID[i%seriesIDLen].key, time.UnixMilli(baseTime+1), binaryData, callback) logVolume += seriesIDVolume + timeVolume + len(binaryData) if logVolume >= rotateSize { @@ -293,7 +292,7 @@ func Benchmark_SeriesID_1000_Buffer_64K_NoSyncFlush_And_Rotate_16MB(b *testing.B } func Benchmark_SeriesID_1000_Buffer_64K_NoSyncFlush_And_Rotate_32MB(b *testing.B) { - wal := newWAL(&Options{BufferSize: 1024 * 64, NoSync: true}) + wal := newWAL(&Options{BufferSize: 1024 * 64, SyncFlush: false}) defer closeWAL(wal) seriesID := seriesID1000 @@ -319,7 +318,7 @@ func Benchmark_SeriesID_1000_Buffer_64K_NoSyncFlush_And_Rotate_32MB(b *testing.B }() for i := 0; i < b.N; i++ { binaryData = data[i%dataLen].binary - wal.Write(seriesID[i%seriesIDLen], time.UnixMilli(baseTime+1), binaryData, callback) + wal.Write(seriesID[i%seriesIDLen].key, time.UnixMilli(baseTime+1), binaryData, callback) logVolume += seriesIDVolume + timeVolume + len(binaryData) if logVolume >= rotateSize { @@ -331,7 +330,7 @@ func Benchmark_SeriesID_1000_Buffer_64K_NoSyncFlush_And_Rotate_32MB(b *testing.B } func Benchmark_SeriesID_1000_Buffer_64K_NoSyncFlush_And_Rotate_64MB(b *testing.B) { - wal := newWAL(&Options{BufferSize: 1024 * 64, NoSync: true}) + wal := newWAL(&Options{BufferSize: 1024 * 64, SyncFlush: false}) defer closeWAL(wal) seriesID := seriesID1000 @@ -357,7 +356,7 @@ func Benchmark_SeriesID_1000_Buffer_64K_NoSyncFlush_And_Rotate_64MB(b *testing.B }() for i := 0; i < b.N; i++ { binaryData = data[i%dataLen].binary - wal.Write(seriesID[i%seriesIDLen], time.UnixMilli(baseTime+1), binaryData, callback) + wal.Write(seriesID[i%seriesIDLen].key, time.UnixMilli(baseTime+1), binaryData, callback) logVolume += seriesIDVolume + timeVolume + len(binaryData) if logVolume >= rotateSize { @@ -398,13 +397,14 @@ func closeWAL(wal WAL) { } } -func newSeriesIDList(series int) []common.GlobalSeriesID { - var seriesIDSet []common.GlobalSeriesID +type SeriesID struct { + key []byte +} + +func newSeriesIDList(series int) []SeriesID { + var seriesIDSet []SeriesID for i := 0; i < series; i++ { - seriesID := common.GlobalSeriesID{ - SeriesID: common.SeriesID(i), - Name: fmt.Sprintf("series-%d", i), - } + seriesID := SeriesID{key: []byte(fmt.Sprintf("series-%d", i))} seriesIDSet = append(seriesIDSet, seriesID) } return seriesIDSet diff --git a/pkg/wal/wal_test.go b/pkg/wal/wal_test.go index f21cda561..d807f9704 100644 --- a/pkg/wal/wal_test.go +++ b/pkg/wal/wal_test.go @@ -31,7 +31,6 @@ import ( "github.com/onsi/gomega" "github.com/onsi/gomega/gleak" - "github.com/apache/skywalking-banyandb/api/common" "github.com/apache/skywalking-banyandb/pkg/test/flags" "github.com/apache/skywalking-banyandb/pkg/wal" ) @@ -83,20 +82,17 @@ var _ = ginkgo.Describe("WAL", func() { wg.Add(writeLogCount) baseTime := time.Now() for i := 0; i < seriesIDCount; i++ { - seriesID := &common.GlobalSeriesID{ - SeriesID: common.SeriesID(i), - Name: fmt.Sprintf("series-%d", i), - } + seriesID := []byte(fmt.Sprintf("series-%d", i)) go func() { for j := 0; j < seriesIDElementCount; j++ { timestamp := time.UnixMilli(baseTime.UnixMilli() + int64(j)) value := []byte(fmt.Sprintf("value-%d", j)) - callback := func(seriesID common.GlobalSeriesID, t time.Time, bytes []byte, err error) { + callback := func(seriesID []byte, t time.Time, bytes []byte, err error) { gomega.Expect(err).ToNot(gomega.HaveOccurred()) wg.Done() } - log.Write(*seriesID, timestamp, value, callback) + log.Write(seriesID, timestamp, value, callback) } }() } @@ -114,13 +110,8 @@ var _ = ginkgo.Describe("WAL", func() { entries := segment.GetLogEntries() for _, entity := range entries { seriesID := entity.GetSeriesID() - seriesIDSequence := seriesID.SeriesID - expectSeriesID := common.GlobalSeriesID{ - SeriesID: seriesIDSequence, - Name: fmt.Sprintf("series-%d", seriesIDSequence), - } // Check seriesID - gomega.Expect(expectSeriesID == seriesID).To(gomega.BeTrue()) + gomega.Expect(seriesID != nil).To(gomega.BeTrue()) timestamps := entity.GetTimestamps() values := entity.GetValues() @@ -171,15 +162,12 @@ var _ = ginkgo.Describe("WAL", func() { writeLogCount := 3 wg.Add(writeLogCount) - expectSegments := make(map[wal.SegmentID]common.GlobalSeriesID) + expectSegments := make(map[wal.SegmentID][]byte) for i := 0; i < writeLogCount; i++ { - seriesID := &common.GlobalSeriesID{ - SeriesID: common.SeriesID(i), - Name: fmt.Sprintf("series-%d", i), - } + seriesID := []byte(fmt.Sprintf("series-%d", i)) timestamp := time.Now() value := []byte(fmt.Sprintf("value-%d", i)) - callback := func(seriesID common.GlobalSeriesID, t time.Time, bytes []byte, err error) { + callback := func(seriesID []byte, t time.Time, bytes []byte, err error) { gomega.Expect(err).ToNot(gomega.HaveOccurred()) // Rotate @@ -189,7 +177,7 @@ var _ = ginkgo.Describe("WAL", func() { wg.Done() } - log.Write(*seriesID, timestamp, value, callback) + log.Write(seriesID, timestamp, value, callback) } wg.Wait() @@ -205,7 +193,7 @@ var _ = ginkgo.Describe("WAL", func() { gomega.Expect(err).ToNot(gomega.HaveOccurred()) entries := segment.GetLogEntries() gomega.Expect(len(entries) == 1).To(gomega.BeTrue()) - gomega.Expect(entries[0].GetSeriesID() == seriesID).To(gomega.BeTrue()) + gomega.Expect(bytes.Equal(entries[0].GetSeriesID(), seriesID)).To(gomega.BeTrue()) } }) })