diff --git a/.gitattributes b/.gitattributes index 28118cbe0..9643b431a 100644 --- a/.gitattributes +++ b/.gitattributes @@ -24,5 +24,6 @@ *.jpeg binary *.ico binary *.gz binary +*.bz2 binary go.sum merge=union diff --git a/.gitignore b/.gitignore index eaad4b524..6b001b0e7 100644 --- a/.gitignore +++ b/.gitignore @@ -56,3 +56,6 @@ target # okteto .stignore + +# profile result +*.prof diff --git a/.licenserc.yaml b/.licenserc.yaml index 00e91e4a6..6f36ce3e5 100644 --- a/.licenserc.yaml +++ b/.licenserc.yaml @@ -84,6 +84,7 @@ header: # `header` section is configurations for source codes license header. - '**/ginkgo.report' - 'ui' - '.github/PULL_REQUEST_TEMPLATE' + - "**/*.prof" comment: on-failure # on what condition license-eye will comment on the pull request, `on-failure`, `always`, `never`. diff --git a/banyand/k8s.yml b/banyand/k8s.yml index 2b8f32fc3..a416b7a61 100644 --- a/banyand/k8s.yml +++ b/banyand/k8s.yml @@ -82,7 +82,7 @@ spec: args: - "standalone" - "--measure-idx-batch-wait-sec=30" - - "--logging.level=warn" + - "--logging-level=info" imagePullPolicy: Always livenessProbe: failureThreshold: 5 diff --git a/banyand/kv/kv.go b/banyand/kv/kv.go index 286757302..e61806835 100644 --- a/banyand/kv/kv.go +++ b/banyand/kv/kv.go @@ -148,15 +148,6 @@ func TSSWithMemTableSize(sizeInBytes int64) TimeSeriesOptions { } } -// TSSWithTimeRange sets the time range of the time series. -func TSSWithTimeRange(timeRange timestamp.TimeRange) TimeSeriesOptions { - return func(store TimeSeriesStore) { - if btss, ok := store.(*badgerTSS); ok { - btss.timeRange = timeRange - } - } -} - // Iterator allows iterating the kv tables. // TODO: use generic to provide a unique iterator. type Iterator interface { @@ -185,7 +176,7 @@ type IndexStore interface { // OpenTimeSeriesStore creates a new TimeSeriesStore. // nolint: contextcheck -func OpenTimeSeriesStore(path string, options ...TimeSeriesOptions) (TimeSeriesStore, error) { +func OpenTimeSeriesStore(path string, timeRange timestamp.TimeRange, options ...TimeSeriesOptions) (TimeSeriesStore, error) { btss := new(badgerTSS) btss.dbOpts = badger.DefaultOptions(path) for _, opt := range options { @@ -198,7 +189,8 @@ func OpenTimeSeriesStore(path string, options ...TimeSeriesOptions) (TimeSeriesS WithInTable(). WithMaxLevels(2). WithBaseTableSize(10 << 20). - WithBaseLevelSize(math.MaxInt64) + WithBaseLevelSize(math.MaxInt64). + WithBlockCacheSize(10 << 20) if btss.dbOpts.MemTableSize < int64(defaultKVMemorySize) { btss.dbOpts.MemTableSize = int64(defaultKVMemorySize) } @@ -212,6 +204,7 @@ func OpenTimeSeriesStore(path string, options ...TimeSeriesOptions) (TimeSeriesS return nil, fmt.Errorf("failed to open time series store: %w", err) } btss.TSet = *badger.NewTSet(btss.db) + btss.timeRange = timeRange return btss, nil } @@ -264,7 +257,8 @@ func OpenStore(path string, opts ...StoreOptions) (Store, error) { WithBaseTableSize(5 << 20). WithBaseLevelSize(25 << 20). WithCompression(options.ZSTD). - WithZSTDCompressionLevel(1) + WithZSTDCompressionLevel(1). + WithBlockCacheSize(10 << 20) var err error bdb.db, err = badger.Open(bdb.dbOpts) diff --git a/banyand/liaison/grpc/measure.go b/banyand/liaison/grpc/measure.go index 2a32d29de..ebb5c58f7 100644 --- a/banyand/liaison/grpc/measure.go +++ b/banyand/liaison/grpc/measure.go @@ -88,16 +88,18 @@ func (ms *measureService) Write(measure measurev1.MeasureService_WriteServer) er reply(writeRequest.GetMetadata(), modelv1.Status_STATUS_INVALID_TIMESTAMP, writeRequest.GetMessageId(), measure, ms.sampled) continue } - measureCache, existed := ms.entityRepo.getLocator(getID(writeRequest.GetMetadata())) - if !existed { - ms.sampled.Error().Err(err).Stringer("written", writeRequest).Msg("failed to measure schema not found") - reply(writeRequest.GetMetadata(), modelv1.Status_STATUS_NOT_FOUND, writeRequest.GetMessageId(), measure, ms.sampled) - continue - } - if writeRequest.Metadata.ModRevision != measureCache.ModRevision { - ms.sampled.Error().Stringer("written", writeRequest).Msg("the measure schema is expired") - reply(writeRequest.GetMetadata(), modelv1.Status_STATUS_EXPIRED_SCHEMA, writeRequest.GetMessageId(), measure, ms.sampled) - continue + if writeRequest.Metadata.ModRevision > 0 { + measureCache, existed := ms.entityRepo.getLocator(getID(writeRequest.GetMetadata())) + if !existed { + ms.sampled.Error().Err(err).Stringer("written", writeRequest).Msg("failed to measure schema not found") + reply(writeRequest.GetMetadata(), modelv1.Status_STATUS_NOT_FOUND, writeRequest.GetMessageId(), measure, ms.sampled) + continue + } + if writeRequest.Metadata.ModRevision != measureCache.ModRevision { + ms.sampled.Error().Stringer("written", writeRequest).Msg("the measure schema is expired") + reply(writeRequest.GetMetadata(), modelv1.Status_STATUS_EXPIRED_SCHEMA, writeRequest.GetMessageId(), measure, ms.sampled) + continue + } } entity, tagValues, shardID, err := ms.navigate(writeRequest.GetMetadata(), writeRequest.GetDataPoint().GetTagFamilies()) if err != nil { diff --git a/banyand/liaison/grpc/stream.go b/banyand/liaison/grpc/stream.go index aa3d49a42..d2aca9768 100644 --- a/banyand/liaison/grpc/stream.go +++ b/banyand/liaison/grpc/stream.go @@ -88,16 +88,18 @@ func (s *streamService) Write(stream streamv1.StreamService_WriteServer) error { reply(nil, modelv1.Status_STATUS_INVALID_TIMESTAMP, writeEntity.GetMessageId(), stream, s.sampled) continue } - streamCache, existed := s.entityRepo.getLocator(getID(writeEntity.GetMetadata())) - if !existed { - s.sampled.Error().Err(err).Stringer("written", writeEntity).Msg("failed to stream schema not found") - reply(writeEntity.GetMetadata(), modelv1.Status_STATUS_NOT_FOUND, writeEntity.GetMessageId(), stream, s.sampled) - continue - } - if writeEntity.Metadata.ModRevision != streamCache.ModRevision { - s.sampled.Error().Stringer("written", writeEntity).Msg("the stream schema is expired") - reply(writeEntity.GetMetadata(), modelv1.Status_STATUS_EXPIRED_SCHEMA, writeEntity.GetMessageId(), stream, s.sampled) - continue + if writeEntity.Metadata.ModRevision > 0 { + streamCache, existed := s.entityRepo.getLocator(getID(writeEntity.GetMetadata())) + if !existed { + s.sampled.Error().Err(err).Stringer("written", writeEntity).Msg("failed to stream schema not found") + reply(writeEntity.GetMetadata(), modelv1.Status_STATUS_NOT_FOUND, writeEntity.GetMessageId(), stream, s.sampled) + continue + } + if writeEntity.Metadata.ModRevision != streamCache.ModRevision { + s.sampled.Error().Stringer("written", writeEntity).Msg("the stream schema is expired") + reply(writeEntity.GetMetadata(), modelv1.Status_STATUS_EXPIRED_SCHEMA, writeEntity.GetMessageId(), stream, s.sampled) + continue + } } entity, tagValues, shardID, err := s.navigate(writeEntity.GetMetadata(), writeEntity.GetElement().GetTagFamilies()) if err != nil { diff --git a/banyand/measure/service.go b/banyand/measure/service.go index 43940d8fe..98a9e1012 100644 --- a/banyand/measure/service.go +++ b/banyand/measure/service.go @@ -87,6 +87,7 @@ func (s *service) FlagSet() *run.FlagSet { flagS.Var(&s.BlockBufferSize, "measure-buffer-size", "block buffer size") flagS.Var(&s.dbOpts.SeriesMemSize, "measure-seriesmeta-mem-size", "series metadata memory size") flagS.Int64Var(&s.dbOpts.BlockInvertedIndex.BatchWaitSec, "measure-idx-batch-wait-sec", 1, "index batch wait in second") + flagS.BoolVar(&s.dbOpts.EnableWAL, "measure-enable-wal", true, "enable write ahead log") return flagS } diff --git a/banyand/measure/tstable.go b/banyand/measure/tstable.go index 28ca9561b..1e0723adf 100644 --- a/banyand/measure/tstable.go +++ b/banyand/measure/tstable.go @@ -37,6 +37,7 @@ import ( "github.com/apache/skywalking-banyandb/pkg/encoding" "github.com/apache/skywalking-banyandb/pkg/logger" pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1" + "github.com/apache/skywalking-banyandb/pkg/timestamp" ) const ( @@ -87,7 +88,7 @@ func (t *tsTable) openBuffer() (err error) { func (t *tsTable) Close() (err error) { t.lock.Lock() defer t.lock.Unlock() - for _, b := range []io.Closer{t.encoderBuffer, t.buffer, t.sst, t.encoderSST} { + for _, b := range []io.Closer{t.sst, t.encoderSST} { if b != nil { err = multierr.Append(err, b.Close()) } @@ -179,9 +180,12 @@ type tsTableFactory struct { compressionMethod databasev1.CompressionMethod } -func (ttf *tsTableFactory) NewTSTable(bufferSupplier *tsdb.BufferSupplier, root string, position common.Position, l *logger.Logger) (tsdb.TSTable, error) { +func (ttf *tsTableFactory) NewTSTable(bufferSupplier *tsdb.BufferSupplier, root string, + position common.Position, l *logger.Logger, timeRange timestamp.TimeRange, +) (tsdb.TSTable, error) { encoderSST, err := kv.OpenTimeSeriesStore( path.Join(root, encoded), + timeRange, kv.TSSWithMemTableSize(ttf.bufferSize), kv.TSSWithLogger(l.Named(encoded)), kv.TSSWithEncoding(ttf.encoderPool, ttf.decoderPool, ttf.encodingChunkSize), @@ -191,6 +195,7 @@ func (ttf *tsTableFactory) NewTSTable(bufferSupplier *tsdb.BufferSupplier, root } sst, err := kv.OpenTimeSeriesStore( path.Join(root, plain), + timeRange, kv.TSSWithMemTableSize(ttf.bufferSize), kv.TSSWithLogger(l.Named(plain)), kv.TSSWithZSTDCompression(int(ttf.plainChunkSize)), diff --git a/banyand/metadata/client.go b/banyand/metadata/client.go index 35ecbcec9..411b3c03e 100644 --- a/banyand/metadata/client.go +++ b/banyand/metadata/client.go @@ -40,15 +40,19 @@ const ( ) // NewClient returns a new metadata client. -func NewClient(_ context.Context) (Service, error) { - return &clientService{closer: run.NewCloser(1)}, nil +func NewClient(forceRegisterNode bool) (Service, error) { + return &clientService{ + closer: run.NewCloser(1), + forceRegisterNode: forceRegisterNode, + }, nil } type clientService struct { - namespace string - schemaRegistry schema.Registry - closer *run.Closer - endpoints []string + schemaRegistry schema.Registry + closer *run.Closer + namespace string + endpoints []string + forceRegisterNode bool } func (s *clientService) SchemaRegistry() schema.Registry { @@ -100,7 +104,7 @@ func (s *clientService) PreRun(ctx context.Context) error { } for { ctxRegister, cancel := context.WithTimeout(ctx, time.Second*10) - err = s.schemaRegistry.RegisterNode(ctxRegister, nodeInfo) + err = s.schemaRegistry.RegisterNode(ctxRegister, nodeInfo, s.forceRegisterNode) cancel() if errors.Is(err, context.DeadlineExceeded) { l.Warn().Strs("etcd-endpoints", s.endpoints).Msg("register node timeout, retrying...") diff --git a/banyand/metadata/schema/etcd.go b/banyand/metadata/schema/etcd.go index 8b90490e5..3abb630a7 100644 --- a/banyand/metadata/schema/etcd.go +++ b/banyand/metadata/schema/etcd.go @@ -93,7 +93,7 @@ func (e *etcdSchemaRegistry) RegisterHandler(name string, kind Kind, handler Eve for i := 0; i < KindSize; i++ { ki := Kind(1 << i) if kind&ki > 0 { - e.l.Info().Str("name", name).Stringer("kind", kind).Msg("registering watcher") + e.l.Info().Str("name", name).Stringer("kind", ki).Msg("registering watcher") w := e.newWatcher(name, ki, handler) e.watchers = append(e.watchers, w) } @@ -315,7 +315,7 @@ func (e *etcdSchemaRegistry) delete(ctx context.Context, metadata Metadata) (boo return false, nil } -func (e *etcdSchemaRegistry) register(ctx context.Context, metadata Metadata) error { +func (e *etcdSchemaRegistry) register(ctx context.Context, metadata Metadata, forced bool) error { if !e.closer.AddRunning() { return ErrClosed } @@ -334,18 +334,25 @@ func (e *etcdSchemaRegistry) register(ctx context.Context, metadata Metadata) er if err != nil { return err } - var ops []clientv3.Cmp - ops = append(ops, clientv3.Compare(clientv3.CreateRevision(key), "=", 0)) - txn := e.client.Txn(ctx).If(ops...) - txn = txn.Then(clientv3.OpPut(key, string(val), clientv3.WithLease(lease.ID))) - txn = txn.Else(clientv3.OpGet(key)) - response, err := txn.Commit() - if err != nil { - return err - } - if !response.Succeeded { - return errGRPCAlreadyExists + if forced { + if _, err = e.client.Put(ctx, key, string(val), clientv3.WithLease(lease.ID)); err != nil { + return err + } + } else { + var ops []clientv3.Cmp + ops = append(ops, clientv3.Compare(clientv3.CreateRevision(key), "=", 0)) + txn := e.client.Txn(ctx).If(ops...) + txn = txn.Then(clientv3.OpPut(key, string(val), clientv3.WithLease(lease.ID))) + txn = txn.Else(clientv3.OpGet(key)) + response, errCommit := txn.Commit() + if errCommit != nil { + return errCommit + } + if !response.Succeeded { + return errGRPCAlreadyExists + } } + // Keep the lease alive // nolint:contextcheck keepAliveChan, err := e.client.KeepAlive(context.Background(), lease.ID) diff --git a/banyand/metadata/schema/node.go b/banyand/metadata/schema/node.go index 575b8c89b..d0c4a0bd1 100644 --- a/banyand/metadata/schema/node.go +++ b/banyand/metadata/schema/node.go @@ -47,14 +47,14 @@ func (e *etcdSchemaRegistry) ListNode(ctx context.Context, role databasev1.Role) return entities, nil } -func (e *etcdSchemaRegistry) RegisterNode(ctx context.Context, node *databasev1.Node) error { +func (e *etcdSchemaRegistry) RegisterNode(ctx context.Context, node *databasev1.Node, forced bool) error { return e.register(ctx, Metadata{ TypeMeta: TypeMeta{ Kind: KindNode, Name: node.Metadata.Name, }, Spec: node, - }) + }, forced) } func formatNodeKey(name string) string { diff --git a/banyand/metadata/schema/register_test.go b/banyand/metadata/schema/register_test.go index 9f05a9990..45e096d90 100644 --- a/banyand/metadata/schema/register_test.go +++ b/banyand/metadata/schema/register_test.go @@ -72,7 +72,7 @@ var _ = ginkgo.Describe("etcd_register", func() { }) ginkgo.It("should revoke the leaser", func() { - gomega.Expect(r.register(context.Background(), md)).ShouldNot(gomega.HaveOccurred()) + gomega.Expect(r.register(context.Background(), md, true)).ShouldNot(gomega.HaveOccurred()) k, err := md.key() gomega.Expect(err).ShouldNot(gomega.HaveOccurred()) gomega.Expect(r.get(context.Background(), k, &databasev1.Node{})).ShouldNot(gomega.HaveOccurred()) @@ -86,7 +86,7 @@ var _ = ginkgo.Describe("etcd_register", func() { }) ginkgo.It("should register only once", func() { - gomega.Expect(r.register(context.Background(), md)).ShouldNot(gomega.HaveOccurred()) - gomega.Expect(r.register(context.Background(), md)).Should(gomega.MatchError(errGRPCAlreadyExists)) + gomega.Expect(r.register(context.Background(), md, false)).ShouldNot(gomega.HaveOccurred()) + gomega.Expect(r.register(context.Background(), md, false)).Should(gomega.MatchError(errGRPCAlreadyExists)) }) }) diff --git a/banyand/metadata/schema/schema.go b/banyand/metadata/schema/schema.go index dd4570ca5..3a35c13f3 100644 --- a/banyand/metadata/schema/schema.go +++ b/banyand/metadata/schema/schema.go @@ -195,5 +195,5 @@ type Property interface { // Node allows CRUD node schemas in a group. type Node interface { ListNode(ctx context.Context, role databasev1.Role) ([]*databasev1.Node, error) - RegisterNode(ctx context.Context, node *databasev1.Node) error + RegisterNode(ctx context.Context, node *databasev1.Node, forced bool) error } diff --git a/banyand/metadata/server.go b/banyand/metadata/server.go index a4c59359a..c96398f83 100644 --- a/banyand/metadata/server.go +++ b/banyand/metadata/server.go @@ -89,10 +89,10 @@ func (s *server) GracefulStop() { } // NewService returns a new metadata repository Service. -func NewService(ctx context.Context) (Service, error) { +func NewService(_ context.Context) (Service, error) { s := &server{} var err error - s.Service, err = NewClient(ctx) + s.Service, err = NewClient(true) if err != nil { return nil, err } diff --git a/banyand/stream/tstable.go b/banyand/stream/tstable.go index 1cdc4fa7a..4f262b0f7 100644 --- a/banyand/stream/tstable.go +++ b/banyand/stream/tstable.go @@ -33,6 +33,7 @@ import ( "github.com/apache/skywalking-banyandb/banyand/kv" "github.com/apache/skywalking-banyandb/banyand/tsdb" "github.com/apache/skywalking-banyandb/pkg/logger" + "github.com/apache/skywalking-banyandb/pkg/timestamp" ) const ( @@ -117,8 +118,10 @@ type tsTableFactory struct { chunkSize int } -func (ttf *tsTableFactory) NewTSTable(bufferSupplier *tsdb.BufferSupplier, root string, position common.Position, l *logger.Logger) (tsdb.TSTable, error) { - sst, err := kv.OpenTimeSeriesStore(path.Join(root, id), kv.TSSWithMemTableSize(ttf.bufferSize), kv.TSSWithLogger(l.Named(id)), +func (ttf *tsTableFactory) NewTSTable(bufferSupplier *tsdb.BufferSupplier, root string, position common.Position, + l *logger.Logger, timeRange timestamp.TimeRange, +) (tsdb.TSTable, error) { + sst, err := kv.OpenTimeSeriesStore(path.Join(root, id), timeRange, kv.TSSWithMemTableSize(ttf.bufferSize), kv.TSSWithLogger(l.Named(id)), kv.TSSWithZSTDCompression(ttf.chunkSize)) if err != nil { return nil, fmt.Errorf("failed to create time series table: %w", err) diff --git a/banyand/tsdb/block.go b/banyand/tsdb/block.go index 32e4d5218..a85965081 100644 --- a/banyand/tsdb/block.go +++ b/banyand/tsdb/block.go @@ -202,7 +202,7 @@ func (b *block) openSafely() (err error) { func (b *block) open() (err error) { if b.tsTable, err = b.openOpts.tsTableFactory.NewTSTable(b.openOpts.bufferSupplier, - b.path, b.position, b.l); err != nil { + b.path, b.position, b.l, b.TimeRange); err != nil { return err } b.closableLst = append(b.closableLst, b.tsTable) diff --git a/banyand/tsdb/shard.go b/banyand/tsdb/shard.go index b7841837e..5ab48ca0e 100644 --- a/banyand/tsdb/shard.go +++ b/banyand/tsdb/shard.go @@ -244,7 +244,6 @@ func (s *shard) TriggerSchedule(task string) bool { func (s *shard) Close() (err error) { s.closeOnce.Do(func() { - _ = s.bufferSupplier.Close() s.scheduler.Close() s.segmentManageStrategy.Close() ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) diff --git a/banyand/tsdb/tsdb_test.go b/banyand/tsdb/tsdb_test.go index ebe299b77..8c2dbd711 100644 --- a/banyand/tsdb/tsdb_test.go +++ b/banyand/tsdb/tsdb_test.go @@ -141,7 +141,7 @@ func NewByPassTSTableFactory() TSTableFactory { return bypassTSTableFactory{} } -func (bypassTSTableFactory) NewTSTable(_ *BufferSupplier, _ string, _ common.Position, _ *logger.Logger) (TSTable, error) { +func (bypassTSTableFactory) NewTSTable(_ *BufferSupplier, _ string, _ common.Position, _ *logger.Logger, _ timestamp.TimeRange) (TSTable, error) { return newBypassTSTable() } diff --git a/banyand/tsdb/tstable.go b/banyand/tsdb/tstable.go index 9aab9bb00..d23a5b8cd 100644 --- a/banyand/tsdb/tstable.go +++ b/banyand/tsdb/tstable.go @@ -25,6 +25,7 @@ import ( "github.com/apache/skywalking-banyandb/api/common" "github.com/apache/skywalking-banyandb/pkg/logger" + "github.com/apache/skywalking-banyandb/pkg/timestamp" ) // TSTable is time series table. @@ -43,5 +44,6 @@ type TSTable interface { // TSTableFactory is the factory of TSTable. type TSTableFactory interface { // NewTSTable creates a new TSTable. - NewTSTable(bufferSupplier *BufferSupplier, root string, position common.Position, l *logger.Logger) (TSTable, error) + NewTSTable(bufferSupplier *BufferSupplier, root string, position common.Position, + l *logger.Logger, timeRange timestamp.TimeRange) (TSTable, error) } diff --git a/go.mod b/go.mod index 991bfa802..10bd2daa0 100644 --- a/go.mod +++ b/go.mod @@ -66,7 +66,7 @@ require ( github.com/davecgh/go-spew v1.1.1 // indirect github.com/dgraph-io/ristretto v0.1.1 // indirect github.com/dgryski/go-metro v0.0.0-20180109044635-280f6062b5bc // indirect - github.com/dustin/go-humanize v1.0.1 // indirect + github.com/dustin/go-humanize v1.0.1 github.com/fsnotify/fsnotify v1.6.0 // indirect github.com/go-logr/logr v1.2.4 // indirect github.com/go-logr/stdr v1.2.2 // indirect diff --git a/pkg/cmdsetup/data.go b/pkg/cmdsetup/data.go index a6ec86a6a..7932f7113 100644 --- a/pkg/cmdsetup/data.go +++ b/pkg/cmdsetup/data.go @@ -38,7 +38,7 @@ import ( func newDataCmd(runners ...run.Unit) *cobra.Command { l := logger.GetLogger("bootstrap") ctx := context.Background() - metaSvc, err := metadata.NewClient(ctx) + metaSvc, err := metadata.NewClient(false) if err != nil { l.Fatal().Err(err).Msg("failed to initiate metadata service") } diff --git a/pkg/cmdsetup/liaison.go b/pkg/cmdsetup/liaison.go index b0c039858..c2c8eec5b 100644 --- a/pkg/cmdsetup/liaison.go +++ b/pkg/cmdsetup/liaison.go @@ -40,7 +40,7 @@ import ( func newLiaisonCmd(runners ...run.Unit) *cobra.Command { l := logger.GetLogger("bootstrap") ctx := context.Background() - metaSvc, err := metadata.NewClient(ctx) + metaSvc, err := metadata.NewClient(false) if err != nil { l.Fatal().Err(err).Msg("failed to initiate metadata service") } diff --git a/pkg/index/inverted/inverted.go b/pkg/index/inverted/inverted.go index 5601d94a6..2437a6275 100644 --- a/pkg/index/inverted/inverted.go +++ b/pkg/index/inverted/inverted.go @@ -25,6 +25,7 @@ import ( "io" "log" "math" + "sync/atomic" "time" "github.com/blugelabs/bluge" @@ -90,13 +91,13 @@ type store struct { ch chan any closer *run.Closer l *logger.Logger + errClosing atomic.Pointer[error] batchInterval time.Duration } // NewStore create a new inverted index repository. func NewStore(opts StoreOpts) (index.Store, error) { - indexConfig := blugeIndex.DefaultConfig(opts.Path).WithUnsafeBatches(). - WithPersisterNapTimeMSec(60 * 1000) + indexConfig := blugeIndex.DefaultConfig(opts.Path) indexConfig.MergePlanOptions.MaxSegmentsPerTier = 1 indexConfig.MergePlanOptions.MaxSegmentSize = 500000 indexConfig.MergePlanOptions.SegmentsPerMergeTask = 20 @@ -124,7 +125,10 @@ func NewStore(opts StoreOpts) (index.Store, error) { func (s *store) Close() error { s.closer.CloseThenWait() - return s.writer.Close() + if s.errClosing.Load() != nil { + return *s.errClosing.Load() + } + return nil } func (s *store) Write(fields []index.Field, docID uint64) error { @@ -291,7 +295,12 @@ func (s *store) SizeOnDisk() int64 { func (s *store) run() { go func() { - defer s.closer.Done() + defer func() { + if err := s.writer.Close(); err != nil { + s.errClosing.Store(&err) + } + s.closer.Done() + }() size := 0 batch := bluge.NewBatch() flush := func() { @@ -304,6 +313,7 @@ func (s *store) run() { batch.Reset() size = 0 } + defer flush() var docIDBuffer bytes.Buffer for { timer := time.NewTimer(s.batchInterval) diff --git a/pkg/query/logical/measure/measure_plan_groupby.go b/pkg/query/logical/measure/measure_plan_groupby.go index 9f62565d4..067577c86 100644 --- a/pkg/query/logical/measure/measure_plan_groupby.go +++ b/pkg/query/logical/measure/measure_plan_groupby.go @@ -154,6 +154,12 @@ func formatGroupByKey(point *measurev1.DataPoint, groupByTagsRefs [][]*logical.T hash := xxhash.New() for _, tagFamilyRef := range groupByTagsRefs { for _, tagRef := range tagFamilyRef { + if tagRef.Spec.TagFamilyIdx >= len(point.GetTagFamilies()) { + return 0, errors.New("tag family index out of range") + } + if tagRef.Spec.TagIdx >= len(point.GetTagFamilies()[tagRef.Spec.TagFamilyIdx].GetTags()) { + return 0, errors.New("tag index out of range") + } tag := point.GetTagFamilies()[tagRef.Spec.TagFamilyIdx].GetTags()[tagRef.Spec.TagIdx] switch v := tag.GetValue().GetValue().(type) { case *modelv1.TagValue_Str: diff --git a/pkg/test/setup/setup.go b/pkg/test/setup/setup.go index b3159f902..8fdcbfec2 100644 --- a/pkg/test/setup/setup.go +++ b/pkg/test/setup/setup.go @@ -71,6 +71,32 @@ func StandaloneWithSchemaLoaders(schemaLoaders []SchemaLoader, certFile, keyFile var ports []int ports, err = test.AllocateFreePorts(4) gomega.Expect(err).NotTo(gomega.HaveOccurred()) + addr, httpAddr, closeFn := standaloneServer(path, ports, schemaLoaders, certFile, keyFile, flags...) + return addr, httpAddr, func() { + closeFn() + deferFn() + } +} + +// ClosableStandalone wires standalone modules to build a testing ready runtime. +func ClosableStandalone(path string, ports []int, flags ...string) (string, string, func()) { + return standaloneServer(path, ports, []SchemaLoader{ + &preloadService{name: "stream"}, + &preloadService{name: "measure"}, + }, "", "", flags...) +} + +// ClosableStandaloneWithSchemaLoaders wires standalone modules to build a testing ready runtime. +func ClosableStandaloneWithSchemaLoaders(path string, ports []int, schemaLoaders []SchemaLoader, flags ...string) (string, string, func()) { + return standaloneServer(path, ports, schemaLoaders, "", "", flags...) +} + +// EmptyClosableStandalone wires standalone modules to build a testing ready runtime. +func EmptyClosableStandalone(path string, ports []int, flags ...string) (string, string, func()) { + return standaloneServer(path, ports, nil, "", "", flags...) +} + +func standaloneServer(path string, ports []int, schemaLoaders []SchemaLoader, certFile, keyFile string, flags ...string) (string, string, func()) { addr := fmt.Sprintf("%s:%d", host, ports[0]) httpAddr := fmt.Sprintf("%s:%d", host, ports[1]) endpoint := fmt.Sprintf("http://%s:%d", host, ports[2]) @@ -127,10 +153,7 @@ func StandaloneWithSchemaLoaders(schemaLoaders []SchemaLoader, certFile, keyFile err = preloadGroup.Run(context.Background()) gomega.Expect(err).NotTo(gomega.HaveOccurred()) } - return addr, httpAddr, func() { - closeFn() - deferFn() - } + return addr, httpAddr, closeFn } // SchemaLoader is a service that can preload schema. diff --git a/pkg/wal/wal.go b/pkg/wal/wal.go index db8f416d6..0d9fac6c1 100644 --- a/pkg/wal/wal.go +++ b/pkg/wal/wal.go @@ -30,6 +30,7 @@ import ( "strconv" "strings" "sync" + "syscall" "time" "github.com/golang/snappy" @@ -332,13 +333,16 @@ func (log *log) Delete(segmentID SegmentID) error { if segmentID == log.workSegment.segmentID { return errors.New("Can not delete the segment which is working") } - + defer delete(log.segmentMap, segmentID) err := os.Remove(log.segmentMap[segmentID].path) - if err != nil { - return errors.Wrap(err, "Delete WAL segment error") + if err == nil { + return nil } - delete(log.segmentMap, segmentID) - return nil + var pathErr *os.PathError + if errors.As(err, &pathErr) && errors.Is(pathErr.Err, syscall.ENOENT) { + return nil + } + return errors.Wrap(err, "Delete WAL segment error") } // Close all of segments and stop WAL work. diff --git a/test/integration/standalone/query_ondisk/query_ondisk_suite_test.go b/test/integration/standalone/query_ondisk/query_ondisk_suite_test.go new file mode 100644 index 000000000..ac89d1087 --- /dev/null +++ b/test/integration/standalone/query_ondisk/query_ondisk_suite_test.go @@ -0,0 +1,105 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package integration_query_ondisk_test + +import ( + "testing" + "time" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + "github.com/onsi/gomega/gleak" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + + "github.com/apache/skywalking-banyandb/pkg/grpchelper" + "github.com/apache/skywalking-banyandb/pkg/logger" + "github.com/apache/skywalking-banyandb/pkg/test" + "github.com/apache/skywalking-banyandb/pkg/test/flags" + "github.com/apache/skywalking-banyandb/pkg/test/helpers" + "github.com/apache/skywalking-banyandb/pkg/test/setup" + "github.com/apache/skywalking-banyandb/pkg/timestamp" + test_cases "github.com/apache/skywalking-banyandb/test/cases" + casesmeasure "github.com/apache/skywalking-banyandb/test/cases/measure" + casesstream "github.com/apache/skywalking-banyandb/test/cases/stream" + casestopn "github.com/apache/skywalking-banyandb/test/cases/topn" + integration_standalone "github.com/apache/skywalking-banyandb/test/integration/standalone" +) + +func TestIntegrationQueryOnDisk(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Integration Query OnDisk Suite", Label(integration_standalone.Labels...)) +} + +var ( + connection *grpc.ClientConn + now time.Time + deferFunc func() + goods []gleak.Goroutine +) + +var _ = SynchronizedBeforeSuite(func() []byte { + goods = gleak.Goroutines() + Expect(logger.Init(logger.Logging{ + Env: "dev", + Level: flags.LogLevel, + })).To(Succeed()) + path, diskCleanupFn, err := test.NewSpace() + Expect(err).NotTo(HaveOccurred()) + var ports []int + ports, err = test.AllocateFreePorts(4) + Expect(err).NotTo(HaveOccurred()) + addr, _, closeFunc := setup.ClosableStandalone(path, ports) + ns := timestamp.NowMilli().UnixNano() + now = time.Unix(0, ns-ns%int64(time.Minute)) + test_cases.Initialize(addr, now) + closeFunc() + time.Sleep(time.Second) + addr, _, closeFunc = setup.EmptyClosableStandalone(path, ports) + deferFunc = func() { + closeFunc() + diskCleanupFn() + } + return []byte(addr) +}, func(address []byte) { + var err error + connection, err = grpchelper.Conn(string(address), 10*time.Second, + grpc.WithTransportCredentials(insecure.NewCredentials())) + casesstream.SharedContext = helpers.SharedContext{ + Connection: connection, + BaseTime: now, + } + casesmeasure.SharedContext = helpers.SharedContext{ + Connection: connection, + BaseTime: now, + } + casestopn.SharedContext = helpers.SharedContext{ + Connection: connection, + BaseTime: now, + } + Expect(err).NotTo(HaveOccurred()) +}) + +var _ = SynchronizedAfterSuite(func() { + if connection != nil { + Expect(connection.Close()).To(Succeed()) + } +}, func() { + deferFunc() + Eventually(gleak.Goroutines, flags.EventuallyTimeout).ShouldNot(gleak.HaveLeaked(goods)) +}) diff --git a/test/stress/cases/istio/cpu.prof b/test/stress/cases/istio/cpu.prof new file mode 100644 index 000000000..e69de29bb diff --git a/test/stress/cases/istio/heap.prof b/test/stress/cases/istio/heap.prof new file mode 100644 index 000000000..e69de29bb diff --git a/test/stress/cases/istio/istio_suite_test.go b/test/stress/cases/istio/istio_suite_test.go index bab730bc9..b6ac0957b 100644 --- a/test/stress/cases/istio/istio_suite_test.go +++ b/test/stress/cases/istio/istio_suite_test.go @@ -24,20 +24,25 @@ import ( "fmt" "io" "os" + "path/filepath" + "runtime/pprof" "testing" "time" + "github.com/dustin/go-humanize" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" "github.com/pkg/errors" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/stats" "google.golang.org/protobuf/encoding/protojson" "google.golang.org/protobuf/types/known/timestamppb" measurev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1" "github.com/apache/skywalking-banyandb/pkg/grpchelper" "github.com/apache/skywalking-banyandb/pkg/logger" + "github.com/apache/skywalking-banyandb/pkg/test" "github.com/apache/skywalking-banyandb/pkg/test/flags" "github.com/apache/skywalking-banyandb/pkg/test/helpers" "github.com/apache/skywalking-banyandb/pkg/test/setup" @@ -49,6 +54,41 @@ func TestIstio(t *testing.T) { RunSpecs(t, "Istio Suite", Label("integration", "slow")) } +var ( + cpuProfileFile *os.File + heapProfileFile *os.File +) + +var _ = BeforeSuite(func() { + // Create CPU profile file + var err error + cpuProfileFile, err = os.Create("cpu.prof") + Expect(err).NotTo(HaveOccurred()) + + // Start CPU profiling + err = pprof.StartCPUProfile(cpuProfileFile) + Expect(err).NotTo(HaveOccurred()) + + // Create heap profile file + heapProfileFile, err = os.Create("heap.prof") + Expect(err).NotTo(HaveOccurred()) +}) + +var _ = AfterSuite(func() { + // Stop CPU profiling + pprof.StopCPUProfile() + + // Write heap profile + err := pprof.WriteHeapProfile(heapProfileFile) + Expect(err).NotTo(HaveOccurred()) + + // Close profile files + err = cpuProfileFile.Close() + Expect(err).NotTo(HaveOccurred()) + err = heapProfileFile.Close() + Expect(err).NotTo(HaveOccurred()) +}) + var _ = Describe("Istio", func() { BeforeEach(func() { Expect(logger.Init(logger.Logging{ @@ -57,20 +97,41 @@ var _ = Describe("Istio", func() { })).To(Succeed()) }) It("should pass", func() { - addr, _, deferFunc := setup.StandaloneWithSchemaLoaders([]setup.SchemaLoader{&preloadService{name: "oap"}}, "", "") - DeferCleanup(deferFunc) + path, deferFn, err := test.NewSpace() + Expect(err).NotTo(HaveOccurred()) + DeferCleanup(func() { + printDiskUsage(path+"/measure", 5, 0) + deferFn() + }) + var ports []int + ports, err = test.AllocateFreePorts(4) + Expect(err).NotTo(HaveOccurred()) + addr, _, closerServerFunc := setup.ClosableStandaloneWithSchemaLoaders( + path, ports, + []setup.SchemaLoader{&preloadService{name: "oap"}}, + "--logging-level", "info") + DeferCleanup(closerServerFunc) Eventually(helpers.HealthCheck(addr, 10*time.Second, 10*time.Second, grpc.WithTransportCredentials(insecure.NewCredentials())), flags.EventuallyTimeout).Should(Succeed()) - conn, err := grpchelper.Conn(addr, 10*time.Second, grpc.WithTransportCredentials(insecure.NewCredentials())) + bc := &clientCounter{} + conn, err := grpchelper.Conn(addr, 10*time.Second, grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithStatsHandler(bc)) Expect(err).NotTo(HaveOccurred()) DeferCleanup(func() { conn.Close() }) - Expect(ReadAndWriteFromFile(extractData(), conn)).To(Succeed()) + startTime := time.Now() + writtenCount, err := ReadAndWriteFromFile(extractData(), conn) + Expect(err).To(Succeed()) + endTime := time.Now() + + fmt.Printf("written %d items in %s\n", writtenCount, endTime.Sub(startTime).String()) + fmt.Printf("throughput: %f items/s\n", float64(writtenCount)/endTime.Sub(startTime).Seconds()) + fmt.Printf("throughput(kb/s) %f\n", float64(bc.bytesSent)/endTime.Sub(startTime).Seconds()/1024) + fmt.Printf("latency: %s\n", bc.totalLatency/time.Duration(writtenCount)) }) }) -func ReadAndWriteFromFile(filePath string, conn *grpc.ClientConn) error { +func ReadAndWriteFromFile(filePath string, conn *grpc.ClientConn) (int, error) { // Open the file for reading l := logger.GetLogger("load_test") @@ -97,16 +158,23 @@ func ReadAndWriteFromFile(filePath string, conn *grpc.ClientConn) error { ctx := context.Background() client, err := c.Write(ctx) if err != nil { - return fmt.Errorf("failed to create write client: %w", err) + return 0, fmt.Errorf("failed to create write client: %w", err) } + writeCount := 0 flush := func(createClient bool) error { if errClose := client.CloseSend(); errClose != nil { return fmt.Errorf("failed to close send: %w", errClose) } bulkSize = 2000 - _, err = client.Recv() - if err != nil && errors.Is(err, io.EOF) { - return fmt.Errorf("failed to receive client: %w", err) + writeCount += 2000 + for i := 0; i < 2000; i++ { + _, err = client.Recv() + if err != nil && !errors.Is(err, io.EOF) { + return fmt.Errorf("failed to receive client: %w", err) + } + if errors.Is(err, io.EOF) { + break + } } if !createClient { return nil @@ -140,14 +208,14 @@ func ReadAndWriteFromFile(filePath string, conn *grpc.ClientConn) error { } jsonMsg, errRead := reader.ReadString('\n') if errRead != nil && errRead.Error() != "EOF" { - return fmt.Errorf("failed to read line from file: %w", errRead) + return fmt.Errorf("line %d failed to read line from file: %w", 2000-bulkSize, errRead) } if errRead != nil && errRead.Error() == "EOF" { break } var req measurev1.WriteRequest if errUnmarshal := protojson.Unmarshal([]byte(jsonMsg), &req); errUnmarshal != nil { - return fmt.Errorf("failed to unmarshal JSON message: %w", errUnmarshal) + return fmt.Errorf("line %d failed to unmarshal JSON message: %w", 2000-bulkSize, errUnmarshal) } req.MessageId = uint64(time.Now().UnixNano()) @@ -175,8 +243,68 @@ func ReadAndWriteFromFile(filePath string, conn *grpc.ClientConn) error { } for i := 0; i < 40; i++ { if err = loop(i); err != nil { + return writeCount, err + } + } + return writeCount, flush(false) +} + +func printDiskUsage(dir string, maxDepth, curDepth int) { + // Calculate the total size of all files and directories within the directory + var totalSize int64 + err := filepath.Walk(dir, func(path string, info os.FileInfo, err error) error { + if err != nil { return err } + if info.Mode().IsRegular() { + totalSize += info.Size() + } + return nil + }) + if err != nil { + fmt.Fprintf(os.Stderr, "Error: %v\n", err) + return + } + + // Print the disk usage of the current directory + fmt.Printf("%s: %s\n", dir, humanize.Bytes(uint64(totalSize))) + + // Recursively print the disk usage of subdirectories + if curDepth < maxDepth { + files, err := os.ReadDir(dir) + if err != nil { + fmt.Fprintf(os.Stderr, "Error: %v\n", err) + return + } + for _, file := range files { + if file.IsDir() { + subdir := filepath.Join(dir, file.Name()) + printDiskUsage(subdir, maxDepth, curDepth+1) + } + } } - return flush(false) +} + +type clientCounter struct { + bytesSent int + totalLatency time.Duration +} + +func (*clientCounter) HandleConn(context.Context, stats.ConnStats) {} + +func (c *clientCounter) TagRPC(ctx context.Context, _ *stats.RPCTagInfo) context.Context { + return ctx +} + +func (c *clientCounter) HandleRPC(_ context.Context, s stats.RPCStats) { + switch s := s.(type) { + case *stats.OutPayload: + c.bytesSent += s.WireLength + case *stats.End: + c.totalLatency += s.EndTime.Sub(s.BeginTime) + } +} + +func (c *clientCounter) TagConn(ctx context.Context, _ *stats.ConnTagInfo) context.Context { + return ctx } diff --git a/test/stress/cases/istio/repo.go b/test/stress/cases/istio/repo.go index 89ac54aa5..f0de5feaa 100644 --- a/test/stress/cases/istio/repo.go +++ b/test/stress/cases/istio/repo.go @@ -20,7 +20,7 @@ package istio import ( "archive/tar" "bytes" - "compress/gzip" + "compress/bzip2" "context" "embed" "encoding/json" @@ -51,13 +51,23 @@ func extractData() string { // Create a subdirectory called "tmp" in the temporary directory tmpSubDir := filepath.Join(tmpDir, "testdata") + target := filepath.Join(tmpSubDir, "access.log") + if _, err := os.Stat(target); err == nil { + absPath, err := filepath.Abs(target) + if err != nil { + fmt.Printf("Error getting absolute path: %v\n", err) + os.Exit(1) + } + return absPath + } err := os.MkdirAll(tmpSubDir, 0o755) if err != nil { fmt.Fprintf(os.Stderr, "Error creating tmp directory: %v\n", err) os.Exit(1) } var data []byte - if data, err = store.ReadFile("testdata/access.tar.gz"); err != nil { + if data, err = store.ReadFile("testdata/access.tar.bz2"); err != nil { + fmt.Printf("Error reading file: %v\n", err) os.Exit(1) } filePath, err := extractTarGz(data, tmpSubDir) @@ -69,13 +79,8 @@ func extractData() string { } func extractTarGz(src []byte, dest string) (string, error) { - gzReader, err := gzip.NewReader(io.Reader(bytes.NewReader(src))) - if err != nil { - return "", err - } - defer gzReader.Close() - - tarReader := tar.NewReader(gzReader) + bzReader := bzip2.NewReader(io.Reader(bytes.NewReader(src))) + tarReader := tar.NewReader(bzReader) for { header, err := tarReader.Next() diff --git a/test/stress/cases/istio/report.md b/test/stress/cases/istio/report.md index 83d985d61..ee76e41e9 100644 --- a/test/stress/cases/istio/report.md +++ b/test/stress/cases/istio/report.md @@ -1,52 +1,113 @@ # Testing Report +## Scenario + +### SkyWalking Entities + +Service: 256 +Instances: 2048, 8 per service + + +### Demo cluster + +Traffic RPS: 4352 +VUS: 8192 + ## Result +written 16186000 items in 38m43.221090505s +throughput: 6967.051077 items/s +throughput(kb/s) 1782.985321 +latency: 2ns + +## CPU Profile + +CPU Usage: 324% + ```bash -Ran 1 of 1 Specs in 2384.312 seconds -SUCCESS! -- 1 Passed | 0 Failed | 0 Pending | 0 Skipped -PASS +Showing top 10 nodes out of 300 + flat flat% sum% cum cum% + 348.58s 4.62% 4.62% 383.07s 5.08% runtime.findObject + 272.92s 3.62% 8.24% 272.92s 3.62% runtime.memmove + 240.53s 3.19% 11.43% 240.53s 3.19% runtime/internal/syscall.Syscall6 + 239.05s 3.17% 14.60% 239.05s 3.17% runtime.memclrNoHeapPointers + 210.82s 2.80% 17.40% 340.64s 4.52% github.com/klauspost/compress/zstd.(*doubleFastEncoder).Encode + 189.80s 2.52% 19.92% 1111.75s 14.74% runtime.mallocgc + 182.17s 2.42% 22.33% 687.47s 9.12% runtime.scanobject + 134.93s 1.79% 24.12% 202.49s 2.69% github.com/dgraph-io/badger/v3/table.(*MergeIterator).Value + 116.62s 1.55% 25.67% 116.62s 1.55% runtime.nextFreeFast (inline) + 110.73s 1.47% 27.14% 110.73s 1.47% github.com/klauspost/compress/zstd.sequenceDecs_decodeSync_bmi2 +``` + +From the top 10 list, we can see that the CPU is mainly used by `compaction`. + +## Heap Profile -Ginkgo ran 1 suite in 39m47.81357862s +Heap Size: 1.2GB + +```bash +Showing top 10 nodes out of 104 + flat flat% sum% cum cum% + 690.27MB 53.22% 53.22% 690.27MB 53.22% github.com/dgraph-io/ristretto/z.Calloc (inline) + 172.07MB 13.27% 66.48% 172.07MB 13.27% runtime.malg + 128MB 9.87% 76.35% 128MB 9.87% github.com/klauspost/compress/zstd.(*fastBase).ensureHist (inline) + 78.98MB 6.09% 82.44% 78.98MB 6.09% github.com/dgraph-io/badger/v3/skl.newArena + 57.51MB 4.43% 86.87% 141.71MB 10.92% github.com/dgraph-io/badger/v3/table.(*Builder).addHelper.func1 + 36.02MB 2.78% 89.65% 177.73MB 13.70% github.com/dgraph-io/badger/v3/table.(*Builder).addHelper + 28.97MB 2.23% 91.88% 28.97MB 2.23% runtime/pprof.(*profMap).lookup + 26.50MB 2.04% 93.93% 757.59MB 58.41% github.com/dgraph-io/badger/v3/table.(*Builder).addInternal + 8.21MB 0.63% 94.56% 8.21MB 0.63% github.com/klauspost/compress/zstd.encoderOptions.encoder + 4MB 0.31% 94.87% 48.50MB 3.74% github.com/dgraph-io/badger/v3/table.(*Table).block ``` +From the top 10 list, we can see that the memory is mainly used by write `buffer(skl)` and `compaction(table)`. +Especially, the compaction includes several table related operations, such as `table.(*Builder).addHelper`, +consumes most of the memory. + + ## Disk Usage ```bash -4.0K ./measure-minute/shard-0/seg-20230626/index -24M ./measure-minute/shard-0/seg-20230626/block-2023062607/lsm -17M ./measure-minute/shard-0/seg-20230626/block-2023062607/encoded -26M ./measure-minute/shard-0/seg-20230626/block-2023062607/tst -66M ./measure-minute/shard-0/seg-20230626/block-2023062607 -66M ./measure-minute/shard-0/seg-20230626 -4.4M ./measure-minute/shard-0/series/md -2.2M ./measure-minute/shard-0/series/inverted -20K ./measure-minute/shard-0/series/lsm -6.6M ./measure-minute/shard-0/series -72M ./measure-minute/shard-0 -4.0K ./measure-minute/shard-1/seg-20230626/index -24M ./measure-minute/shard-1/seg-20230626/block-2023062607/lsm -19M ./measure-minute/shard-1/seg-20230626/block-2023062607/encoded -26M ./measure-minute/shard-1/seg-20230626/block-2023062607/tst -68M ./measure-minute/shard-1/seg-20230626/block-2023062607 -68M ./measure-minute/shard-1/seg-20230626 -4.4M ./measure-minute/shard-1/series/md -2.2M ./measure-minute/shard-1/series/inverted -20K ./measure-minute/shard-1/series/lsm -6.5M ./measure-minute/shard-1/series -74M ./measure-minute/shard-1 -146M ./measure-minute -4.0K ./measure-default/shard-0/seg-20230626/index -79M ./measure-default/shard-0/seg-20230626/block-2023062607/lsm -65M ./measure-default/shard-0/seg-20230626/block-2023062607/encoded -85M ./measure-default/shard-0/seg-20230626/block-2023062607/tst -228M ./measure-default/shard-0/seg-20230626/block-2023062607 -228M ./measure-default/shard-0/seg-20230626 -16M ./measure-default/shard-0/series/md -2.4M ./measure-default/shard-0/series/inverted -20K ./measure-default/shard-0/series/lsm -18M ./measure-default/shard-0/series -245M ./measure-default/shard-0 -245M ./measure-default -391M . +measure: 446 MB +measure/measure-default: 272 MB +measure/measure-default/shard-0: 272 MB +measure/measure-default/shard-0/buffer-0: 1.4 MB +measure/measure-default/shard-0/buffer-1: 2.8 MB +measure/measure-default/shard-0/seg-20231015: 247 MB +measure/measure-default/shard-0/seg-20231015/block-2023101516: 247 MB +measure/measure-default/shard-0/seg-20231015/block-2023101516/encoded: 74 MB +measure/measure-default/shard-0/seg-20231015/block-2023101516/lsm: 83 MB +measure/measure-default/shard-0/seg-20231015/block-2023101516/tst: 90 MB +measure/measure-default/shard-0/seg-20231015/index: 0 B +measure/measure-default/shard-0/series: 21 MB +measure/measure-default/shard-0/series/inverted: 2.9 MB +measure/measure-default/shard-0/series/lsm: 1.0 MB +measure/measure-default/shard-0/series/md: 17 MB +measure/measure-minute: 173 MB +measure/measure-minute/shard-0: 89 MB +measure/measure-minute/shard-0/buffer-0: 2.0 MB +measure/measure-minute/shard-0/buffer-1: 1.6 MB +measure/measure-minute/shard-0/seg-20231015: 76 MB +measure/measure-minute/shard-0/seg-20231015/block-2023101516: 76 MB +measure/measure-minute/shard-0/seg-20231015/block-2023101516/encoded: 23 MB +measure/measure-minute/shard-0/seg-20231015/block-2023101516/lsm: 26 MB +measure/measure-minute/shard-0/seg-20231015/block-2023101516/tst: 28 MB +measure/measure-minute/shard-0/seg-20231015/index: 0 B +measure/measure-minute/shard-0/series: 8.7 MB +measure/measure-minute/shard-0/series/inverted: 2.1 MB +measure/measure-minute/shard-0/series/lsm: 1.0 MB +measure/measure-minute/shard-0/series/md: 5.6 MB +measure/measure-minute/shard-1: 84 MB +measure/measure-minute/shard-1/buffer-0: 1.5 MB +measure/measure-minute/shard-1/buffer-1: 698 kB +measure/measure-minute/shard-1/seg-20231015: 73 MB +measure/measure-minute/shard-1/seg-20231015/block-2023101516: 73 MB +measure/measure-minute/shard-1/seg-20231015/block-2023101516/encoded: 19 MB +measure/measure-minute/shard-1/seg-20231015/block-2023101516/lsm: 26 MB +measure/measure-minute/shard-1/seg-20231015/block-2023101516/tst: 28 MB +measure/measure-minute/shard-1/seg-20231015/index: 0 B +measure/measure-minute/shard-1/series: 9.3 MB +measure/measure-minute/shard-1/series/inverted: 2.7 MB +measure/measure-minute/shard-1/series/lsm: 1.0 MB +measure/measure-minute/shard-1/series/md: 5.6 MB ``` diff --git a/test/stress/cases/istio/testdata/access.tar.bz2 b/test/stress/cases/istio/testdata/access.tar.bz2 new file mode 100644 index 000000000..1384b47cf Binary files /dev/null and b/test/stress/cases/istio/testdata/access.tar.bz2 differ diff --git a/test/stress/cases/istio/testdata/access.tar.gz b/test/stress/cases/istio/testdata/access.tar.gz deleted file mode 100644 index 5608dca89..000000000 Binary files a/test/stress/cases/istio/testdata/access.tar.gz and /dev/null differ