Skip to content

Commit

Permalink
Fix issues found by smoke and stress tests (#342)
Browse files Browse the repository at this point in the history
  • Loading branch information
hanahmily authored Oct 16, 2023
1 parent 768baa1 commit d043b3a
Show file tree
Hide file tree
Showing 35 changed files with 512 additions and 146 deletions.
1 change: 1 addition & 0 deletions .gitattributes
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,6 @@
*.jpeg binary
*.ico binary
*.gz binary
*.bz2 binary

go.sum merge=union
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -56,3 +56,6 @@ target

# okteto
.stignore

# profile result
*.prof
1 change: 1 addition & 0 deletions .licenserc.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ header: # `header` section is configurations for source codes license header.
- '**/ginkgo.report'
- 'ui'
- '.github/PULL_REQUEST_TEMPLATE'
- "**/*.prof"

comment: on-failure # on what condition license-eye will comment on the pull request, `on-failure`, `always`, `never`.

Expand Down
2 changes: 1 addition & 1 deletion banyand/k8s.yml
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ spec:
args:
- "standalone"
- "--measure-idx-batch-wait-sec=30"
- "--logging.level=warn"
- "--logging-level=info"
imagePullPolicy: Always
livenessProbe:
failureThreshold: 5
Expand Down
18 changes: 6 additions & 12 deletions banyand/kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,15 +148,6 @@ func TSSWithMemTableSize(sizeInBytes int64) TimeSeriesOptions {
}
}

// TSSWithTimeRange sets the time range of the time series.
func TSSWithTimeRange(timeRange timestamp.TimeRange) TimeSeriesOptions {
return func(store TimeSeriesStore) {
if btss, ok := store.(*badgerTSS); ok {
btss.timeRange = timeRange
}
}
}

// Iterator allows iterating the kv tables.
// TODO: use generic to provide a unique iterator.
type Iterator interface {
Expand Down Expand Up @@ -185,7 +176,7 @@ type IndexStore interface {

// OpenTimeSeriesStore creates a new TimeSeriesStore.
// nolint: contextcheck
func OpenTimeSeriesStore(path string, options ...TimeSeriesOptions) (TimeSeriesStore, error) {
func OpenTimeSeriesStore(path string, timeRange timestamp.TimeRange, options ...TimeSeriesOptions) (TimeSeriesStore, error) {
btss := new(badgerTSS)
btss.dbOpts = badger.DefaultOptions(path)
for _, opt := range options {
Expand All @@ -198,7 +189,8 @@ func OpenTimeSeriesStore(path string, options ...TimeSeriesOptions) (TimeSeriesS
WithInTable().
WithMaxLevels(2).
WithBaseTableSize(10 << 20).
WithBaseLevelSize(math.MaxInt64)
WithBaseLevelSize(math.MaxInt64).
WithBlockCacheSize(10 << 20)
if btss.dbOpts.MemTableSize < int64(defaultKVMemorySize) {
btss.dbOpts.MemTableSize = int64(defaultKVMemorySize)
}
Expand All @@ -212,6 +204,7 @@ func OpenTimeSeriesStore(path string, options ...TimeSeriesOptions) (TimeSeriesS
return nil, fmt.Errorf("failed to open time series store: %w", err)
}
btss.TSet = *badger.NewTSet(btss.db)
btss.timeRange = timeRange
return btss, nil
}

Expand Down Expand Up @@ -264,7 +257,8 @@ func OpenStore(path string, opts ...StoreOptions) (Store, error) {
WithBaseTableSize(5 << 20).
WithBaseLevelSize(25 << 20).
WithCompression(options.ZSTD).
WithZSTDCompressionLevel(1)
WithZSTDCompressionLevel(1).
WithBlockCacheSize(10 << 20)

var err error
bdb.db, err = badger.Open(bdb.dbOpts)
Expand Down
22 changes: 12 additions & 10 deletions banyand/liaison/grpc/measure.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,16 +88,18 @@ func (ms *measureService) Write(measure measurev1.MeasureService_WriteServer) er
reply(writeRequest.GetMetadata(), modelv1.Status_STATUS_INVALID_TIMESTAMP, writeRequest.GetMessageId(), measure, ms.sampled)
continue
}
measureCache, existed := ms.entityRepo.getLocator(getID(writeRequest.GetMetadata()))
if !existed {
ms.sampled.Error().Err(err).Stringer("written", writeRequest).Msg("failed to measure schema not found")
reply(writeRequest.GetMetadata(), modelv1.Status_STATUS_NOT_FOUND, writeRequest.GetMessageId(), measure, ms.sampled)
continue
}
if writeRequest.Metadata.ModRevision != measureCache.ModRevision {
ms.sampled.Error().Stringer("written", writeRequest).Msg("the measure schema is expired")
reply(writeRequest.GetMetadata(), modelv1.Status_STATUS_EXPIRED_SCHEMA, writeRequest.GetMessageId(), measure, ms.sampled)
continue
if writeRequest.Metadata.ModRevision > 0 {
measureCache, existed := ms.entityRepo.getLocator(getID(writeRequest.GetMetadata()))
if !existed {
ms.sampled.Error().Err(err).Stringer("written", writeRequest).Msg("failed to measure schema not found")
reply(writeRequest.GetMetadata(), modelv1.Status_STATUS_NOT_FOUND, writeRequest.GetMessageId(), measure, ms.sampled)
continue
}
if writeRequest.Metadata.ModRevision != measureCache.ModRevision {
ms.sampled.Error().Stringer("written", writeRequest).Msg("the measure schema is expired")
reply(writeRequest.GetMetadata(), modelv1.Status_STATUS_EXPIRED_SCHEMA, writeRequest.GetMessageId(), measure, ms.sampled)
continue
}
}
entity, tagValues, shardID, err := ms.navigate(writeRequest.GetMetadata(), writeRequest.GetDataPoint().GetTagFamilies())
if err != nil {
Expand Down
22 changes: 12 additions & 10 deletions banyand/liaison/grpc/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,16 +88,18 @@ func (s *streamService) Write(stream streamv1.StreamService_WriteServer) error {
reply(nil, modelv1.Status_STATUS_INVALID_TIMESTAMP, writeEntity.GetMessageId(), stream, s.sampled)
continue
}
streamCache, existed := s.entityRepo.getLocator(getID(writeEntity.GetMetadata()))
if !existed {
s.sampled.Error().Err(err).Stringer("written", writeEntity).Msg("failed to stream schema not found")
reply(writeEntity.GetMetadata(), modelv1.Status_STATUS_NOT_FOUND, writeEntity.GetMessageId(), stream, s.sampled)
continue
}
if writeEntity.Metadata.ModRevision != streamCache.ModRevision {
s.sampled.Error().Stringer("written", writeEntity).Msg("the stream schema is expired")
reply(writeEntity.GetMetadata(), modelv1.Status_STATUS_EXPIRED_SCHEMA, writeEntity.GetMessageId(), stream, s.sampled)
continue
if writeEntity.Metadata.ModRevision > 0 {
streamCache, existed := s.entityRepo.getLocator(getID(writeEntity.GetMetadata()))
if !existed {
s.sampled.Error().Err(err).Stringer("written", writeEntity).Msg("failed to stream schema not found")
reply(writeEntity.GetMetadata(), modelv1.Status_STATUS_NOT_FOUND, writeEntity.GetMessageId(), stream, s.sampled)
continue
}
if writeEntity.Metadata.ModRevision != streamCache.ModRevision {
s.sampled.Error().Stringer("written", writeEntity).Msg("the stream schema is expired")
reply(writeEntity.GetMetadata(), modelv1.Status_STATUS_EXPIRED_SCHEMA, writeEntity.GetMessageId(), stream, s.sampled)
continue
}
}
entity, tagValues, shardID, err := s.navigate(writeEntity.GetMetadata(), writeEntity.GetElement().GetTagFamilies())
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions banyand/measure/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ func (s *service) FlagSet() *run.FlagSet {
flagS.Var(&s.BlockBufferSize, "measure-buffer-size", "block buffer size")
flagS.Var(&s.dbOpts.SeriesMemSize, "measure-seriesmeta-mem-size", "series metadata memory size")
flagS.Int64Var(&s.dbOpts.BlockInvertedIndex.BatchWaitSec, "measure-idx-batch-wait-sec", 1, "index batch wait in second")
flagS.BoolVar(&s.dbOpts.EnableWAL, "measure-enable-wal", true, "enable write ahead log")
return flagS
}

Expand Down
9 changes: 7 additions & 2 deletions banyand/measure/tstable.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"github.com/apache/skywalking-banyandb/pkg/encoding"
"github.com/apache/skywalking-banyandb/pkg/logger"
pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
"github.com/apache/skywalking-banyandb/pkg/timestamp"
)

const (
Expand Down Expand Up @@ -87,7 +88,7 @@ func (t *tsTable) openBuffer() (err error) {
func (t *tsTable) Close() (err error) {
t.lock.Lock()
defer t.lock.Unlock()
for _, b := range []io.Closer{t.encoderBuffer, t.buffer, t.sst, t.encoderSST} {
for _, b := range []io.Closer{t.sst, t.encoderSST} {
if b != nil {
err = multierr.Append(err, b.Close())
}
Expand Down Expand Up @@ -179,9 +180,12 @@ type tsTableFactory struct {
compressionMethod databasev1.CompressionMethod
}

func (ttf *tsTableFactory) NewTSTable(bufferSupplier *tsdb.BufferSupplier, root string, position common.Position, l *logger.Logger) (tsdb.TSTable, error) {
func (ttf *tsTableFactory) NewTSTable(bufferSupplier *tsdb.BufferSupplier, root string,
position common.Position, l *logger.Logger, timeRange timestamp.TimeRange,
) (tsdb.TSTable, error) {
encoderSST, err := kv.OpenTimeSeriesStore(
path.Join(root, encoded),
timeRange,
kv.TSSWithMemTableSize(ttf.bufferSize),
kv.TSSWithLogger(l.Named(encoded)),
kv.TSSWithEncoding(ttf.encoderPool, ttf.decoderPool, ttf.encodingChunkSize),
Expand All @@ -191,6 +195,7 @@ func (ttf *tsTableFactory) NewTSTable(bufferSupplier *tsdb.BufferSupplier, root
}
sst, err := kv.OpenTimeSeriesStore(
path.Join(root, plain),
timeRange,
kv.TSSWithMemTableSize(ttf.bufferSize),
kv.TSSWithLogger(l.Named(plain)),
kv.TSSWithZSTDCompression(int(ttf.plainChunkSize)),
Expand Down
18 changes: 11 additions & 7 deletions banyand/metadata/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,19 @@ const (
)

// NewClient returns a new metadata client.
func NewClient(_ context.Context) (Service, error) {
return &clientService{closer: run.NewCloser(1)}, nil
func NewClient(forceRegisterNode bool) (Service, error) {
return &clientService{
closer: run.NewCloser(1),
forceRegisterNode: forceRegisterNode,
}, nil
}

type clientService struct {
namespace string
schemaRegistry schema.Registry
closer *run.Closer
endpoints []string
schemaRegistry schema.Registry
closer *run.Closer
namespace string
endpoints []string
forceRegisterNode bool
}

func (s *clientService) SchemaRegistry() schema.Registry {
Expand Down Expand Up @@ -100,7 +104,7 @@ func (s *clientService) PreRun(ctx context.Context) error {
}
for {
ctxRegister, cancel := context.WithTimeout(ctx, time.Second*10)
err = s.schemaRegistry.RegisterNode(ctxRegister, nodeInfo)
err = s.schemaRegistry.RegisterNode(ctxRegister, nodeInfo, s.forceRegisterNode)
cancel()
if errors.Is(err, context.DeadlineExceeded) {
l.Warn().Strs("etcd-endpoints", s.endpoints).Msg("register node timeout, retrying...")
Expand Down
33 changes: 20 additions & 13 deletions banyand/metadata/schema/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func (e *etcdSchemaRegistry) RegisterHandler(name string, kind Kind, handler Eve
for i := 0; i < KindSize; i++ {
ki := Kind(1 << i)
if kind&ki > 0 {
e.l.Info().Str("name", name).Stringer("kind", kind).Msg("registering watcher")
e.l.Info().Str("name", name).Stringer("kind", ki).Msg("registering watcher")
w := e.newWatcher(name, ki, handler)
e.watchers = append(e.watchers, w)
}
Expand Down Expand Up @@ -315,7 +315,7 @@ func (e *etcdSchemaRegistry) delete(ctx context.Context, metadata Metadata) (boo
return false, nil
}

func (e *etcdSchemaRegistry) register(ctx context.Context, metadata Metadata) error {
func (e *etcdSchemaRegistry) register(ctx context.Context, metadata Metadata, forced bool) error {
if !e.closer.AddRunning() {
return ErrClosed
}
Expand All @@ -334,18 +334,25 @@ func (e *etcdSchemaRegistry) register(ctx context.Context, metadata Metadata) er
if err != nil {
return err
}
var ops []clientv3.Cmp
ops = append(ops, clientv3.Compare(clientv3.CreateRevision(key), "=", 0))
txn := e.client.Txn(ctx).If(ops...)
txn = txn.Then(clientv3.OpPut(key, string(val), clientv3.WithLease(lease.ID)))
txn = txn.Else(clientv3.OpGet(key))
response, err := txn.Commit()
if err != nil {
return err
}
if !response.Succeeded {
return errGRPCAlreadyExists
if forced {
if _, err = e.client.Put(ctx, key, string(val), clientv3.WithLease(lease.ID)); err != nil {
return err
}
} else {
var ops []clientv3.Cmp
ops = append(ops, clientv3.Compare(clientv3.CreateRevision(key), "=", 0))
txn := e.client.Txn(ctx).If(ops...)
txn = txn.Then(clientv3.OpPut(key, string(val), clientv3.WithLease(lease.ID)))
txn = txn.Else(clientv3.OpGet(key))
response, errCommit := txn.Commit()
if errCommit != nil {
return errCommit
}
if !response.Succeeded {
return errGRPCAlreadyExists
}
}

// Keep the lease alive
// nolint:contextcheck
keepAliveChan, err := e.client.KeepAlive(context.Background(), lease.ID)
Expand Down
4 changes: 2 additions & 2 deletions banyand/metadata/schema/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,14 @@ func (e *etcdSchemaRegistry) ListNode(ctx context.Context, role databasev1.Role)
return entities, nil
}

func (e *etcdSchemaRegistry) RegisterNode(ctx context.Context, node *databasev1.Node) error {
func (e *etcdSchemaRegistry) RegisterNode(ctx context.Context, node *databasev1.Node, forced bool) error {
return e.register(ctx, Metadata{
TypeMeta: TypeMeta{
Kind: KindNode,
Name: node.Metadata.Name,
},
Spec: node,
})
}, forced)
}

func formatNodeKey(name string) string {
Expand Down
6 changes: 3 additions & 3 deletions banyand/metadata/schema/register_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ var _ = ginkgo.Describe("etcd_register", func() {
})

ginkgo.It("should revoke the leaser", func() {
gomega.Expect(r.register(context.Background(), md)).ShouldNot(gomega.HaveOccurred())
gomega.Expect(r.register(context.Background(), md, true)).ShouldNot(gomega.HaveOccurred())
k, err := md.key()
gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
gomega.Expect(r.get(context.Background(), k, &databasev1.Node{})).ShouldNot(gomega.HaveOccurred())
Expand All @@ -86,7 +86,7 @@ var _ = ginkgo.Describe("etcd_register", func() {
})

ginkgo.It("should register only once", func() {
gomega.Expect(r.register(context.Background(), md)).ShouldNot(gomega.HaveOccurred())
gomega.Expect(r.register(context.Background(), md)).Should(gomega.MatchError(errGRPCAlreadyExists))
gomega.Expect(r.register(context.Background(), md, false)).ShouldNot(gomega.HaveOccurred())
gomega.Expect(r.register(context.Background(), md, false)).Should(gomega.MatchError(errGRPCAlreadyExists))
})
})
2 changes: 1 addition & 1 deletion banyand/metadata/schema/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,5 +195,5 @@ type Property interface {
// Node allows CRUD node schemas in a group.
type Node interface {
ListNode(ctx context.Context, role databasev1.Role) ([]*databasev1.Node, error)
RegisterNode(ctx context.Context, node *databasev1.Node) error
RegisterNode(ctx context.Context, node *databasev1.Node, forced bool) error
}
4 changes: 2 additions & 2 deletions banyand/metadata/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,10 +89,10 @@ func (s *server) GracefulStop() {
}

// NewService returns a new metadata repository Service.
func NewService(ctx context.Context) (Service, error) {
func NewService(_ context.Context) (Service, error) {
s := &server{}
var err error
s.Service, err = NewClient(ctx)
s.Service, err = NewClient(true)
if err != nil {
return nil, err
}
Expand Down
7 changes: 5 additions & 2 deletions banyand/stream/tstable.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/apache/skywalking-banyandb/banyand/kv"
"github.com/apache/skywalking-banyandb/banyand/tsdb"
"github.com/apache/skywalking-banyandb/pkg/logger"
"github.com/apache/skywalking-banyandb/pkg/timestamp"
)

const (
Expand Down Expand Up @@ -117,8 +118,10 @@ type tsTableFactory struct {
chunkSize int
}

func (ttf *tsTableFactory) NewTSTable(bufferSupplier *tsdb.BufferSupplier, root string, position common.Position, l *logger.Logger) (tsdb.TSTable, error) {
sst, err := kv.OpenTimeSeriesStore(path.Join(root, id), kv.TSSWithMemTableSize(ttf.bufferSize), kv.TSSWithLogger(l.Named(id)),
func (ttf *tsTableFactory) NewTSTable(bufferSupplier *tsdb.BufferSupplier, root string, position common.Position,
l *logger.Logger, timeRange timestamp.TimeRange,
) (tsdb.TSTable, error) {
sst, err := kv.OpenTimeSeriesStore(path.Join(root, id), timeRange, kv.TSSWithMemTableSize(ttf.bufferSize), kv.TSSWithLogger(l.Named(id)),
kv.TSSWithZSTDCompression(ttf.chunkSize))
if err != nil {
return nil, fmt.Errorf("failed to create time series table: %w", err)
Expand Down
2 changes: 1 addition & 1 deletion banyand/tsdb/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ func (b *block) openSafely() (err error) {

func (b *block) open() (err error) {
if b.tsTable, err = b.openOpts.tsTableFactory.NewTSTable(b.openOpts.bufferSupplier,
b.path, b.position, b.l); err != nil {
b.path, b.position, b.l, b.TimeRange); err != nil {
return err
}
b.closableLst = append(b.closableLst, b.tsTable)
Expand Down
1 change: 0 additions & 1 deletion banyand/tsdb/shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,6 @@ func (s *shard) TriggerSchedule(task string) bool {

func (s *shard) Close() (err error) {
s.closeOnce.Do(func() {
_ = s.bufferSupplier.Close()
s.scheduler.Close()
s.segmentManageStrategy.Close()
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
Expand Down
2 changes: 1 addition & 1 deletion banyand/tsdb/tsdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func NewByPassTSTableFactory() TSTableFactory {
return bypassTSTableFactory{}
}

func (bypassTSTableFactory) NewTSTable(_ *BufferSupplier, _ string, _ common.Position, _ *logger.Logger) (TSTable, error) {
func (bypassTSTableFactory) NewTSTable(_ *BufferSupplier, _ string, _ common.Position, _ *logger.Logger, _ timestamp.TimeRange) (TSTable, error) {
return newBypassTSTable()
}

Expand Down
4 changes: 3 additions & 1 deletion banyand/tsdb/tstable.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

"github.com/apache/skywalking-banyandb/api/common"
"github.com/apache/skywalking-banyandb/pkg/logger"
"github.com/apache/skywalking-banyandb/pkg/timestamp"
)

// TSTable is time series table.
Expand All @@ -43,5 +44,6 @@ type TSTable interface {
// TSTableFactory is the factory of TSTable.
type TSTableFactory interface {
// NewTSTable creates a new TSTable.
NewTSTable(bufferSupplier *BufferSupplier, root string, position common.Position, l *logger.Logger) (TSTable, error)
NewTSTable(bufferSupplier *BufferSupplier, root string, position common.Position,
l *logger.Logger, timeRange timestamp.TimeRange) (TSTable, error)
}
Loading

0 comments on commit d043b3a

Please sign in to comment.