Skip to content

Commit

Permalink
Scheduler metrics and Several Bugs (#519)
Browse files Browse the repository at this point in the history
  • Loading branch information
hanahmily authored Aug 24, 2024
1 parent 0816ebf commit 7e541d0
Show file tree
Hide file tree
Showing 17 changed files with 216 additions and 26 deletions.
4 changes: 3 additions & 1 deletion CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ Release Notes.
- Add the topN query trace.
- Introduce the round-robin selector to Liaison Node.
- Optimize query performance of series index.
- Add storage metrics.
- Add liaison, remote queue, storage(rotation), time-series tables, metadata cache and scheduler metrics.

### Bugs

Expand All @@ -38,6 +38,8 @@ Release Notes.
- Fix a bug where a distributed query would return an empty result if the "limit" was set much lower than the "offset".
- Fix duplicated measure data in a single part.
- Fix several "sync.Pool" leak issues by adding a tracker to the pool.
- Fix panic when removing a expired segment.
- Fix panic when reading a disorder block of measure. This block's versions are not sorted in descending order.

### Documentation

Expand Down
19 changes: 15 additions & 4 deletions banyand/dquery/dquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,23 +29,31 @@ import (
modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
"github.com/apache/skywalking-banyandb/banyand/measure"
"github.com/apache/skywalking-banyandb/banyand/metadata"
"github.com/apache/skywalking-banyandb/banyand/observability"
"github.com/apache/skywalking-banyandb/banyand/queue"
"github.com/apache/skywalking-banyandb/banyand/stream"
"github.com/apache/skywalking-banyandb/pkg/bus"
"github.com/apache/skywalking-banyandb/pkg/logger"
"github.com/apache/skywalking-banyandb/pkg/query/executor"
"github.com/apache/skywalking-banyandb/pkg/run"
"github.com/apache/skywalking-banyandb/pkg/schema"
)

const (
moduleName = "distributed-query"
)

var _ run.Service = (*queryService)(nil)
var (
_ run.Service = (*queryService)(nil)
distributedQueryScope = observability.RootScope.SubScope("dquery")
streamScope = distributedQueryScope.SubScope("stream")
measureScope = distributedQueryScope.SubScope("measure")
)

type queryService struct {
metaService metadata.Repo
pipeline queue.Server
omr observability.MetricsRegistry
log *logger.Logger
sqp *streamQueryProcessor
mqp *measureQueryProcessor
Expand All @@ -55,12 +63,13 @@ type queryService struct {
}

// NewService return a new query service.
func NewService(metaService metadata.Repo, pipeline queue.Server, broadcaster bus.Broadcaster,
func NewService(metaService metadata.Repo, pipeline queue.Server, broadcaster bus.Broadcaster, omr observability.MetricsRegistry,
) (run.Unit, error) {
svc := &queryService{
metaService: metaService,
closer: run.NewCloser(1),
pipeline: pipeline,
omr: omr,
}
svc.sqp = &streamQueryProcessor{
queryService: svc,
Expand Down Expand Up @@ -89,8 +98,10 @@ func (q *queryService) PreRun(ctx context.Context) error {
node := val.(common.Node)
q.nodeID = node.NodeID
q.log = logger.GetLogger(moduleName)
q.sqp.streamService = stream.NewPortableRepository(q.metaService, q.log)
q.mqp.measureService = measure.NewPortableRepository(q.metaService, q.log)
q.sqp.streamService = stream.NewPortableRepository(q.metaService, q.log,
schema.NewMetrics(q.omr.With(streamScope)))
q.mqp.measureService = measure.NewPortableRepository(q.metaService, q.log,
schema.NewMetrics(q.omr.With(measureScope)))
return multierr.Combine(
q.pipeline.Subscribe(data.TopicStreamQuery, q.sqp),
q.pipeline.Subscribe(data.TopicMeasureQuery, q.mqp),
Expand Down
3 changes: 3 additions & 0 deletions banyand/internal/storage/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ type metrics struct {
totalRetentionHasData meter.Counter
totalRetentionErr meter.Counter
totalRetentionHasDataLatency meter.Counter

schedulerMetrics *observability.SchedulerMetrics
}

func newMetrics(factory *observability.Factory) *metrics {
Expand All @@ -52,6 +54,7 @@ func newMetrics(factory *observability.Factory) *metrics {
totalRetentionErr: factory.NewCounter("total_retention_err"),
totalRetentionHasDataLatency: factory.NewCounter("total_retention_has_data_latency"),
totalRetentionHasData: factory.NewCounter("total_retention_has_data"),
schedulerMetrics: observability.NewSchedulerMetrics(factory),
}
}

Expand Down
3 changes: 2 additions & 1 deletion banyand/internal/storage/segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -391,9 +391,10 @@ func (sc *segmentController[T, O]) remove(deadline time.Time) (hasSegment bool,
for _, s := range sc.segments() {
if s.Before(deadline) {
hasSegment = true
id := s.id
s.delete()
sc.Lock()
sc.removeSeg(s.id)
sc.removeSeg(id)
sc.Unlock()
sc.l.Info().Stringer("segment", s).Msg("removed a segment")
}
Expand Down
4 changes: 4 additions & 0 deletions banyand/internal/storage/tsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,10 @@ func (d *database[T, O]) collect() {
refCount += atomic.LoadInt32(&s.refCount)
}
d.totalSegRefs.Set(float64(refCount))
metrics := d.scheduler.Metrics()
for job, m := range metrics {
d.metrics.schedulerMetrics.Collect(job, m)
}
}

type walkFn func(suffix string) error
Expand Down
2 changes: 2 additions & 0 deletions banyand/liaison/grpc/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ type metrics struct {
totalStarted meter.Counter
totalFinished meter.Counter
totalErr meter.Counter
totalPanic meter.Counter
totalLatency meter.Counter

totalStreamStarted meter.Counter
Expand All @@ -49,6 +50,7 @@ func newMetrics(factory *observability.Factory) *metrics {
totalStarted: factory.NewCounter("total_started", "group", "service", "method"),
totalFinished: factory.NewCounter("total_finished", "group", "service", "method"),
totalErr: factory.NewCounter("total_err", "group", "service", "method"),
totalPanic: factory.NewCounter("total_panic"),
totalLatency: factory.NewCounter("total_latency", "group", "service", "method"),
totalStreamStarted: factory.NewCounter("total_stream_started", "service", "method"),
totalStreamFinished: factory.NewCounter("total_stream_finished", "service", "method"),
Expand Down
4 changes: 3 additions & 1 deletion banyand/liaison/grpc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ type server struct {
streamSVC *streamService
*streamRegistryServer
*indexRuleBindingRegistryServer
metrics *metrics
keyFile string
certFile string
accessLogRootPath string
Expand Down Expand Up @@ -157,6 +158,7 @@ func (s *server) PreRun(_ context.Context) error {
}
}
metrics := newMetrics(s.omr.With(liaisonGrpcScope))
s.metrics = metrics
s.streamSVC.metrics = metrics
s.measureSVC.metrics = metrics
s.propertyServer.metrics = metrics
Expand Down Expand Up @@ -229,7 +231,7 @@ func (s *server) Serve() run.StopNotify {
}
grpcPanicRecoveryHandler := func(p any) (err error) {
s.log.Error().Interface("panic", p).Str("stack", string(debug.Stack())).Msg("recovered from panic")

s.metrics.totalPanic.Inc(1)
return status.Errorf(codes.Internal, "%s", p)
}

Expand Down
38 changes: 38 additions & 0 deletions banyand/measure/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -632,6 +632,44 @@ func (bc *blockCursor) copyTo(r *model.MeasureResult, storedIndexValue map[commo
}
}

func (bc *blockCursor) replace(r *model.MeasureResult, storedIndexValue map[common.SeriesID]map[string]*modelv1.TagValue) {
r.SID = bc.bm.seriesID
r.Timestamps[len(r.Timestamps)-1] = bc.timestamps[bc.idx]
r.Versions[len(r.Versions)-1] = bc.versions[bc.idx]
var indexValue map[string]*modelv1.TagValue
if storedIndexValue != nil {
indexValue = storedIndexValue[r.SID]
}
for i := range r.TagFamilies {
tfName := r.TagFamilies[i].Name
var cf *columnFamily
for j := range r.TagFamilies[i].Tags {
tagName := r.TagFamilies[i].Tags[j].Name
if indexValue != nil && indexValue[tagName] != nil {
r.TagFamilies[i].Tags[j].Values[len(r.TagFamilies[i].Tags[j].Values)-1] = indexValue[tagName]
continue
}
if cf == nil {
for i := range bc.tagFamilies {
if bc.tagFamilies[i].name == tfName {
cf = &bc.tagFamilies[i]
break
}
}
}
for _, c := range cf.columns {
if c.name == tagName {
r.TagFamilies[i].Tags[j].Values[len(r.TagFamilies[i].Tags[j].Values)-1] = mustDecodeTagValue(c.valueType, c.values[bc.idx])
break
}
}
}
}
for i, c := range bc.fields.columns {
r.Fields[i].Values[len(r.Fields[i].Values)-1] = mustDecodeFieldValue(c.valueType, c.values[bc.idx])
}
}

func (bc *blockCursor) loadData(tmpBlock *block) bool {
tmpBlock.reset()
cfm := make([]columnMetadata, 0, len(bc.fieldProjection))
Expand Down
2 changes: 1 addition & 1 deletion banyand/measure/merger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ func Test_mergeParts(t *testing.T) {
},
{
name: "Test with multiple parts with same ts",
dpsList: []*dataPoints{dpsTS1, dpsTS1, dpsTS1},
dpsList: []*dataPoints{dpsTS11, dpsTS1},
want: []blockMetadata{
{seriesID: 1, count: 1, uncompressedSizeBytes: 1676},
{seriesID: 2, count: 1, uncompressedSizeBytes: 55},
Expand Down
6 changes: 5 additions & 1 deletion banyand/measure/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ import (
resourceSchema "github.com/apache/skywalking-banyandb/pkg/schema"
)

var metadataScope = measureScope.SubScope("metadata")

// SchemaService allows querying schema information.
type SchemaService interface {
Query
Expand All @@ -58,21 +60,23 @@ func newSchemaRepo(path string, svc *service) *schemaRepo {
svc.metadata,
svc.l,
newSupplier(path, svc),
resourceSchema.NewMetrics(svc.omr.With(metadataScope)),
),
}
sr.start()
return sr
}

// NewPortableRepository creates a new portable repository.
func NewPortableRepository(metadata metadata.Repo, l *logger.Logger) SchemaService {
func NewPortableRepository(metadata metadata.Repo, l *logger.Logger, metrics *resourceSchema.Metrics) SchemaService {
r := &schemaRepo{
l: l,
metadata: metadata,
Repository: resourceSchema.NewPortableRepository(
metadata,
l,
newPortableSupplier(metadata, l),
metrics,
),
}
r.start()
Expand Down
2 changes: 1 addition & 1 deletion banyand/measure/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -575,7 +575,7 @@ func (qr *queryResult) merge(storedIndexValue map[common.SeriesID]map[string]*mo
if len(result.Timestamps) > 0 &&
topBC.timestamps[topBC.idx] == result.Timestamps[len(result.Timestamps)-1] {
if topBC.versions[topBC.idx] > lastVersion {
logger.Panicf("following parts version should be less or equal to the previous one")
topBC.replace(result, storedIndexValue)
}
} else {
topBC.copyTo(result, storedIndexValue, tagProjection)
Expand Down
69 changes: 55 additions & 14 deletions banyand/observability/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/apache/skywalking-banyandb/banyand/metadata"
"github.com/apache/skywalking-banyandb/banyand/queue"
"github.com/apache/skywalking-banyandb/pkg/logger"
"github.com/apache/skywalking-banyandb/pkg/meter"
"github.com/apache/skywalking-banyandb/pkg/meter/native"
"github.com/apache/skywalking-banyandb/pkg/run"
"github.com/apache/skywalking-banyandb/pkg/timestamp"
Expand All @@ -45,6 +46,8 @@ const (
var (
_ run.Service = (*metricService)(nil)
_ run.Config = (*metricService)(nil)

obScope = RootScope.SubScope("observability")
)

// Service type for Metric Service.
Expand All @@ -65,20 +68,21 @@ func NewMetricService(metadata metadata.Repo, pipeline queue.Client, nodeType st
}

type metricService struct {
metadata metadata.Repo
nodeSelector native.NodeSelector
pipeline queue.Client
scheduler *timestamp.Scheduler
l *logger.Logger
closer *run.Closer
svr *http.Server
nCollection *native.MetricCollection
promReg *prometheus.Registry
npf nativeProviderFactory
listenAddr string
nodeType string
modes []string
mutex sync.Mutex
metadata metadata.Repo
nodeSelector native.NodeSelector
pipeline queue.Client
svr *http.Server
l *logger.Logger
closer *run.Closer
scheduler *timestamp.Scheduler
nCollection *native.MetricCollection
promReg *prometheus.Registry
schedulerMetrics *SchedulerMetrics
npf nativeProviderFactory
listenAddr string
nodeType string
modes []string
mutex sync.Mutex
}

func (p *metricService) FlagSet() *run.FlagSet {
Expand Down Expand Up @@ -143,8 +147,13 @@ func (p *metricService) Serve() run.StopNotify {
p.initMetrics()
clock, _ := timestamp.GetClock(context.TODO())
p.scheduler = timestamp.NewScheduler(p.l, clock)
p.schedulerMetrics = NewSchedulerMetrics(p.With(obScope))
err := p.scheduler.Register("metrics-collector", cron.Descriptor, "@every 15s", func(_ time.Time, _ *logger.Logger) bool {
MetricsCollector.collect()
metrics := p.scheduler.Metrics()
for job, m := range metrics {
p.schedulerMetrics.Collect(job, m)
}
return true
})
if err != nil {
Expand Down Expand Up @@ -207,3 +216,35 @@ func containsMode(modes []string, mode string) bool {
}
return false
}

// SchedulerMetrics is the metrics for scheduler.
type SchedulerMetrics struct {
totalJobsStarted meter.Gauge
totalJobsFinished meter.Gauge
totalTasksStarted meter.Gauge
totalTasksFinished meter.Gauge
totalTasksPanic meter.Gauge
totalTaskLatency meter.Gauge
}

// NewSchedulerMetrics creates a new scheduler metrics.
func NewSchedulerMetrics(factory *Factory) *SchedulerMetrics {
return &SchedulerMetrics{
totalJobsStarted: factory.NewGauge("scheduler_jobs_started", "job"),
totalJobsFinished: factory.NewGauge("scheduler_jobs_finished", "job"),
totalTasksStarted: factory.NewGauge("scheduler_tasks_started", "job"),
totalTasksFinished: factory.NewGauge("scheduler_tasks_finished", "job"),
totalTasksPanic: factory.NewGauge("scheduler_tasks_panic", "job"),
totalTaskLatency: factory.NewGauge("scheduler_task_latency", "job"),
}
}

// Collect collects the scheduler metrics.
func (sm *SchedulerMetrics) Collect(job string, m *timestamp.SchedulerMetrics) {
sm.totalJobsStarted.Set(float64(m.TotalJobsStarted.Load()), job)
sm.totalJobsFinished.Set(float64(m.TotalJobsFinished.Load()), job)
sm.totalTasksStarted.Set(float64(m.TotalTasksStarted.Load()), job)
sm.totalTasksFinished.Set(float64(m.TotalTasksFinished.Load()), job)
sm.totalTasksPanic.Set(float64(m.TotalTasksPanic.Load()), job)
sm.totalTaskLatency.Set(float64(m.TotalTaskLatencyInNanoseconds.Load())/float64(time.Second), job)
}
6 changes: 5 additions & 1 deletion banyand/stream/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ import (
resourceSchema "github.com/apache/skywalking-banyandb/pkg/schema"
)

var metadataScope = streamScope.SubScope("metadata")

// SchemaService allows querying schema information.
type SchemaService interface {
Query
Expand All @@ -59,21 +61,23 @@ func newSchemaRepo(path string, svc *service) schemaRepo {
svc.metadata,
svc.l,
newSupplier(path, svc),
resourceSchema.NewMetrics(svc.omr.With(metadataScope)),
),
}
sr.start()
return sr
}

// NewPortableRepository creates a new portable repository.
func NewPortableRepository(metadata metadata.Repo, l *logger.Logger) SchemaService {
func NewPortableRepository(metadata metadata.Repo, l *logger.Logger, metrics *resourceSchema.Metrics) SchemaService {
r := &schemaRepo{
l: l,
metadata: metadata,
Repository: resourceSchema.NewPortableRepository(
metadata,
l,
newPortableSupplier(metadata, l),
metrics,
),
}
r.start()
Expand Down
Loading

0 comments on commit 7e541d0

Please sign in to comment.