Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add performance metrics instrument #517

Merged
merged 7 commits into from
Aug 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ Release Notes.
- Add the topN query trace.
- Introduce the round-robin selector to Liaison Node.
- Optimize query performance of series index.
- Add storage metrics.

### Bugs

Expand Down
27 changes: 18 additions & 9 deletions api/common/id.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,27 +62,36 @@ type Position struct {
Database string
Shard string
Segment string
Block string
}

// LabelNames returns the label names of Position.
func LabelNames() []string {
return []string{"module", "database", "shard", "seg", "block"}
// DBLabelNames returns the label names of Position in the database level.
func DBLabelNames() []string {
return []string{"group"}
}

// SegLabelNames returns the label names of Position in the segment level.
func SegLabelNames() []string {
return []string{"seg"}
}

// ShardLabelNames returns the label names of Position. It is used for shard level metrics.
func ShardLabelNames() []string {
return []string{"module", "database", "shard"}
return []string{"seg", "shard"}
}

// DBLabelValues returns the label values of Position in the database level.
func (p Position) DBLabelValues() []string {
return []string{p.Database}
}

// LabelValues returns the label values of Position.
func (p Position) LabelValues() []string {
return []string{p.Module, p.Database, p.Shard, p.Segment, p.Block}
// SegLabelValues returns the label values of Position.
func (p Position) SegLabelValues() []string {
return []string{p.Segment}
}

// ShardLabelValues returns the label values of Position. It is used for shard level metrics.
func (p Position) ShardLabelValues() []string {
return []string{p.Module, p.Database, p.Shard}
return []string{p.Segment, p.Shard}
}

// SetPosition sets a position returned from fn to attach it to ctx, then return a new context.
Expand Down
21 changes: 15 additions & 6 deletions banyand/internal/storage/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,20 +43,28 @@ func (s *segment[T, O]) Lookup(ctx context.Context, series []*pbv1.Series) (pbv1
}

type seriesIndex struct {
store index.SeriesStore
l *logger.Logger
store index.SeriesStore
l *logger.Logger
metrics *inverted.Metrics
p common.Position
}

func newSeriesIndex(ctx context.Context, root string, flushTimeoutSeconds int64) (*seriesIndex, error) {
func newSeriesIndex(ctx context.Context, root string, flushTimeoutSeconds int64, metrics *inverted.Metrics) (*seriesIndex, error) {
si := &seriesIndex{
l: logger.Fetch(ctx, "series_index"),
p: common.GetPosition(ctx),
}
var err error
if si.store, err = inverted.NewStore(inverted.StoreOpts{
opts := inverted.StoreOpts{
Path: path.Join(root, "sidx"),
Logger: si.l,
BatchWaitSec: flushTimeoutSeconds,
}); err != nil {
}
if metrics != nil {
opts.Metrics = metrics
si.metrics = opts.Metrics
}
var err error
if si.store, err = inverted.NewStore(opts); err != nil {
return nil, err
}
return si, nil
Expand Down Expand Up @@ -270,5 +278,6 @@ func (s *seriesIndex) Search(ctx context.Context, series []*pbv1.Series, opts In
}

func (s *seriesIndex) Close() error {
s.metrics.DeleteAll(s.p.SegLabelValues()...)
return s.store.Close()
}
4 changes: 2 additions & 2 deletions banyand/internal/storage/index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import (
func TestSeriesIndex_Primary(t *testing.T) {
ctx := context.Background()
path, fn := setUp(require.New(t))
si, err := newSeriesIndex(ctx, path, 0)
si, err := newSeriesIndex(ctx, path, 0, nil)
require.NoError(t, err)
defer func() {
require.NoError(t, si.Close())
Expand Down Expand Up @@ -66,7 +66,7 @@ func TestSeriesIndex_Primary(t *testing.T) {
require.NoError(t, si.Write(docs))
// Restart the index
require.NoError(t, si.Close())
si, err = newSeriesIndex(ctx, path, 0)
si, err = newSeriesIndex(ctx, path, 0, nil)
require.NoError(t, err)
tests := []struct {
name string
Expand Down
112 changes: 112 additions & 0 deletions banyand/internal/storage/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package storage

import (
"github.com/apache/skywalking-banyandb/banyand/observability"
"github.com/apache/skywalking-banyandb/pkg/meter"
)

type metrics struct {
lastTickTime meter.Gauge
totalSegRefs meter.Gauge

totalRotationStarted meter.Counter
totalRotationFinished meter.Counter
totalRotationErr meter.Counter

totalRetentionStarted meter.Counter
totalRetentionFinished meter.Counter
totalRetentionHasData meter.Counter
totalRetentionErr meter.Counter
totalRetentionHasDataLatency meter.Counter
}

func newMetrics(factory *observability.Factory) *metrics {
if factory == nil {
return nil
}
return &metrics{
lastTickTime: factory.NewGauge("last_tick_time"),
totalSegRefs: factory.NewGauge("total_segment_refs"),
totalRotationStarted: factory.NewCounter("total_rotation_started"),
totalRotationFinished: factory.NewCounter("total_rotation_finished"),
totalRotationErr: factory.NewCounter("total_rotation_err"),
totalRetentionStarted: factory.NewCounter("total_retention_started"),
totalRetentionFinished: factory.NewCounter("total_retention_finished"),
totalRetentionErr: factory.NewCounter("total_retention_err"),
totalRetentionHasDataLatency: factory.NewCounter("total_retention_has_data_latency"),
totalRetentionHasData: factory.NewCounter("total_retention_has_data"),
}
}

func (d *database[T, O]) incTotalRotationStarted(delta int) {
if d.metrics == nil {
return
}
d.metrics.totalRotationStarted.Inc(float64(delta))
}

func (d *database[T, O]) incTotalRotationFinished(delta int) {
if d.metrics == nil {
return
}
d.metrics.totalRotationFinished.Inc(float64(delta))
}

func (d *database[T, O]) incTotalRotationErr(delta int) {
if d.metrics == nil {
return
}
d.metrics.totalRotationErr.Inc(float64(delta))
}

func (d *database[T, O]) incTotalRetentionStarted(delta int) {
if d.metrics == nil {
return
}
d.metrics.totalRetentionStarted.Inc(float64(delta))
}

func (d *database[T, O]) incTotalRetentionFinished(delta int) {
if d.metrics == nil {
return
}
d.metrics.totalRetentionFinished.Inc(float64(delta))
}

func (d *database[T, O]) incTotalRetentionHasData(delta int) {
if d.metrics == nil {
return
}
d.metrics.totalRetentionHasData.Inc(float64(delta))
}

func (d *database[T, O]) incTotalRetentionErr(delta int) {
if d.metrics == nil {
return
}
d.metrics.totalRetentionErr.Inc(float64(delta))
}

func (d *database[T, O]) incTotalRetentionHasDataLatency(delta float64) {
if d.metrics == nil {
return
}
d.metrics.totalRetentionHasDataLatency.Inc(delta)
}
14 changes: 13 additions & 1 deletion banyand/internal/storage/rotation.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,14 @@ func (d *database[T, O]) startRotationTask() error {
if gap <= 0 || gap > newSegmentTimeGap {
return
}
d.incTotalRotationStarted(1)
defer d.incTotalRotationFinished(1)
start := d.segmentController.opts.SegmentInterval.nextTime(t)
d.logger.Info().Time("segment_start", start).Time("event_time", t).Msg("create new segment")
_, err := d.segmentController.create(start)
if err != nil {
d.logger.Error().Err(err).Msgf("failed to create new segment.")
d.incTotalRotationErr(1)
}
}()
}(ts)
Expand Down Expand Up @@ -110,9 +113,18 @@ func (rc *retentionTask[T, O]) run(now time.Time, l *logger.Logger) bool {
<-rc.running
}()

rc.database.incTotalRetentionStarted(1)
defer rc.database.incTotalRetentionFinished(1)
deadline := now.Add(-rc.duration)
if err := rc.database.segmentController.remove(deadline); err != nil {
start := time.Now()
hasData, err := rc.database.segmentController.remove(deadline)
if hasData {
rc.database.incTotalRetentionHasData(1)
rc.database.incTotalRetentionHasDataLatency(time.Since(start).Seconds())
}
if err != nil {
l.Error().Err(err)
rc.database.incTotalRetentionErr(1)
}
return true
}
15 changes: 14 additions & 1 deletion banyand/internal/storage/rotation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/stretchr/testify/require"

"github.com/apache/skywalking-banyandb/api/common"
"github.com/apache/skywalking-banyandb/banyand/observability"
"github.com/apache/skywalking-banyandb/pkg/fs"
"github.com/apache/skywalking-banyandb/pkg/logger"
"github.com/apache/skywalking-banyandb/pkg/test"
Expand Down Expand Up @@ -168,8 +169,20 @@ func (m *MockTSTable) Close() error {
return nil
}

func (m *MockTSTable) Collect(_ Metrics) {}

var MockTSTableCreator = func(_ fs.FileSystem, _ string, _ common.Position,
_ *logger.Logger, _ timestamp.TimeRange, _ any,
_ *logger.Logger, _ timestamp.TimeRange, _, _ any,
) (*MockTSTable, error) {
return &MockTSTable{}, nil
}

type MockMetrics struct{}

func (m *MockMetrics) DeleteAll() {}

func (m *MockMetrics) Factory() *observability.Factory {
return nil
}

var MockMetricsCreator = func(_ common.Position) Metrics { return &MockMetrics{} }
Loading
Loading