Skip to content

Commit

Permalink
Add performance metrics instrument (apache#517)
Browse files Browse the repository at this point in the history
  • Loading branch information
hanahmily authored and StLeoX committed Sep 24, 2024
1 parent bbf13a6 commit ec58eec
Show file tree
Hide file tree
Showing 65 changed files with 2,196 additions and 427 deletions.
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ Release Notes.
- Add the topN query trace.
- Introduce the round-robin selector to Liaison Node.
- Optimize query performance of series index.
- Add storage metrics.

### Bugs

Expand Down
27 changes: 18 additions & 9 deletions api/common/id.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,27 +62,36 @@ type Position struct {
Database string
Shard string
Segment string
Block string
}

// LabelNames returns the label names of Position.
func LabelNames() []string {
return []string{"module", "database", "shard", "seg", "block"}
// DBLabelNames returns the label names of Position in the database level.
func DBLabelNames() []string {
return []string{"group"}
}

// SegLabelNames returns the label names of Position in the segment level.
func SegLabelNames() []string {
return []string{"seg"}
}

// ShardLabelNames returns the label names of Position. It is used for shard level metrics.
func ShardLabelNames() []string {
return []string{"module", "database", "shard"}
return []string{"seg", "shard"}
}

// DBLabelValues returns the label values of Position in the database level.
func (p Position) DBLabelValues() []string {
return []string{p.Database}
}

// LabelValues returns the label values of Position.
func (p Position) LabelValues() []string {
return []string{p.Module, p.Database, p.Shard, p.Segment, p.Block}
// SegLabelValues returns the label values of Position.
func (p Position) SegLabelValues() []string {
return []string{p.Segment}
}

// ShardLabelValues returns the label values of Position. It is used for shard level metrics.
func (p Position) ShardLabelValues() []string {
return []string{p.Module, p.Database, p.Shard}
return []string{p.Segment, p.Shard}
}

// SetPosition sets a position returned from fn to attach it to ctx, then return a new context.
Expand Down
21 changes: 15 additions & 6 deletions banyand/internal/storage/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,20 +43,28 @@ func (s *segment[T, O]) Lookup(ctx context.Context, series []*pbv1.Series) (pbv1
}

type seriesIndex struct {
store index.SeriesStore
l *logger.Logger
store index.SeriesStore
l *logger.Logger
metrics *inverted.Metrics
p common.Position
}

func newSeriesIndex(ctx context.Context, root string, flushTimeoutSeconds int64) (*seriesIndex, error) {
func newSeriesIndex(ctx context.Context, root string, flushTimeoutSeconds int64, metrics *inverted.Metrics) (*seriesIndex, error) {
si := &seriesIndex{
l: logger.Fetch(ctx, "series_index"),
p: common.GetPosition(ctx),
}
var err error
if si.store, err = inverted.NewStore(inverted.StoreOpts{
opts := inverted.StoreOpts{
Path: path.Join(root, "sidx"),
Logger: si.l,
BatchWaitSec: flushTimeoutSeconds,
}); err != nil {
}
if metrics != nil {
opts.Metrics = metrics
si.metrics = opts.Metrics
}
var err error
if si.store, err = inverted.NewStore(opts); err != nil {
return nil, err
}
return si, nil
Expand Down Expand Up @@ -270,5 +278,6 @@ func (s *seriesIndex) Search(ctx context.Context, series []*pbv1.Series, opts In
}

func (s *seriesIndex) Close() error {
s.metrics.DeleteAll(s.p.SegLabelValues()...)
return s.store.Close()
}
4 changes: 2 additions & 2 deletions banyand/internal/storage/index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import (
func TestSeriesIndex_Primary(t *testing.T) {
ctx := context.Background()
path, fn := setUp(require.New(t))
si, err := newSeriesIndex(ctx, path, 0)
si, err := newSeriesIndex(ctx, path, 0, nil)
require.NoError(t, err)
defer func() {
require.NoError(t, si.Close())
Expand Down Expand Up @@ -66,7 +66,7 @@ func TestSeriesIndex_Primary(t *testing.T) {
require.NoError(t, si.Write(docs))
// Restart the index
require.NoError(t, si.Close())
si, err = newSeriesIndex(ctx, path, 0)
si, err = newSeriesIndex(ctx, path, 0, nil)
require.NoError(t, err)
tests := []struct {
name string
Expand Down
112 changes: 112 additions & 0 deletions banyand/internal/storage/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package storage

import (
"github.com/apache/skywalking-banyandb/banyand/observability"
"github.com/apache/skywalking-banyandb/pkg/meter"
)

type metrics struct {
lastTickTime meter.Gauge
totalSegRefs meter.Gauge

totalRotationStarted meter.Counter
totalRotationFinished meter.Counter
totalRotationErr meter.Counter

totalRetentionStarted meter.Counter
totalRetentionFinished meter.Counter
totalRetentionHasData meter.Counter
totalRetentionErr meter.Counter
totalRetentionHasDataLatency meter.Counter
}

func newMetrics(factory *observability.Factory) *metrics {
if factory == nil {
return nil
}
return &metrics{
lastTickTime: factory.NewGauge("last_tick_time"),
totalSegRefs: factory.NewGauge("total_segment_refs"),
totalRotationStarted: factory.NewCounter("total_rotation_started"),
totalRotationFinished: factory.NewCounter("total_rotation_finished"),
totalRotationErr: factory.NewCounter("total_rotation_err"),
totalRetentionStarted: factory.NewCounter("total_retention_started"),
totalRetentionFinished: factory.NewCounter("total_retention_finished"),
totalRetentionErr: factory.NewCounter("total_retention_err"),
totalRetentionHasDataLatency: factory.NewCounter("total_retention_has_data_latency"),
totalRetentionHasData: factory.NewCounter("total_retention_has_data"),
}
}

func (d *database[T, O]) incTotalRotationStarted(delta int) {
if d.metrics == nil {
return
}
d.metrics.totalRotationStarted.Inc(float64(delta))
}

func (d *database[T, O]) incTotalRotationFinished(delta int) {
if d.metrics == nil {
return
}
d.metrics.totalRotationFinished.Inc(float64(delta))
}

func (d *database[T, O]) incTotalRotationErr(delta int) {
if d.metrics == nil {
return
}
d.metrics.totalRotationErr.Inc(float64(delta))
}

func (d *database[T, O]) incTotalRetentionStarted(delta int) {
if d.metrics == nil {
return
}
d.metrics.totalRetentionStarted.Inc(float64(delta))
}

func (d *database[T, O]) incTotalRetentionFinished(delta int) {
if d.metrics == nil {
return
}
d.metrics.totalRetentionFinished.Inc(float64(delta))
}

func (d *database[T, O]) incTotalRetentionHasData(delta int) {
if d.metrics == nil {
return
}
d.metrics.totalRetentionHasData.Inc(float64(delta))
}

func (d *database[T, O]) incTotalRetentionErr(delta int) {
if d.metrics == nil {
return
}
d.metrics.totalRetentionErr.Inc(float64(delta))
}

func (d *database[T, O]) incTotalRetentionHasDataLatency(delta float64) {
if d.metrics == nil {
return
}
d.metrics.totalRetentionHasDataLatency.Inc(delta)
}
14 changes: 13 additions & 1 deletion banyand/internal/storage/rotation.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,14 @@ func (d *database[T, O]) startRotationTask() error {
if gap <= 0 || gap > newSegmentTimeGap {
return
}
d.incTotalRotationStarted(1)
defer d.incTotalRotationFinished(1)
start := d.segmentController.opts.SegmentInterval.nextTime(t)
d.logger.Info().Time("segment_start", start).Time("event_time", t).Msg("create new segment")
_, err := d.segmentController.create(start)
if err != nil {
d.logger.Error().Err(err).Msgf("failed to create new segment.")
d.incTotalRotationErr(1)
}
}()
}(ts)
Expand Down Expand Up @@ -110,9 +113,18 @@ func (rc *retentionTask[T, O]) run(now time.Time, l *logger.Logger) bool {
<-rc.running
}()

rc.database.incTotalRetentionStarted(1)
defer rc.database.incTotalRetentionFinished(1)
deadline := now.Add(-rc.duration)
if err := rc.database.segmentController.remove(deadline); err != nil {
start := time.Now()
hasData, err := rc.database.segmentController.remove(deadline)
if hasData {
rc.database.incTotalRetentionHasData(1)
rc.database.incTotalRetentionHasDataLatency(time.Since(start).Seconds())
}
if err != nil {
l.Error().Err(err)
rc.database.incTotalRetentionErr(1)
}
return true
}
15 changes: 14 additions & 1 deletion banyand/internal/storage/rotation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/stretchr/testify/require"

"github.com/apache/skywalking-banyandb/api/common"
"github.com/apache/skywalking-banyandb/banyand/observability"
"github.com/apache/skywalking-banyandb/pkg/fs"
"github.com/apache/skywalking-banyandb/pkg/logger"
"github.com/apache/skywalking-banyandb/pkg/test"
Expand Down Expand Up @@ -168,8 +169,20 @@ func (m *MockTSTable) Close() error {
return nil
}

func (m *MockTSTable) Collect(_ Metrics) {}

var MockTSTableCreator = func(_ fs.FileSystem, _ string, _ common.Position,
_ *logger.Logger, _ timestamp.TimeRange, _ any,
_ *logger.Logger, _ timestamp.TimeRange, _, _ any,
) (*MockTSTable, error) {
return &MockTSTable{}, nil
}

type MockMetrics struct{}

func (m *MockMetrics) DeleteAll() {}

func (m *MockMetrics) Factory() *observability.Factory {
return nil
}

var MockMetricsCreator = func(_ common.Position) Metrics { return &MockMetrics{} }
Loading

0 comments on commit ec58eec

Please sign in to comment.