From 7e541d0275cd776360a14c75044be4bdac872b11 Mon Sep 17 00:00:00 2001 From: Gao Hongtao Date: Sat, 24 Aug 2024 09:55:33 +0800 Subject: [PATCH] Scheduler metrics and Several Bugs (#519) --- CHANGES.md | 4 +- banyand/dquery/dquery.go | 19 ++++++-- banyand/internal/storage/metrics.go | 3 ++ banyand/internal/storage/segment.go | 3 +- banyand/internal/storage/tsdb.go | 4 ++ banyand/liaison/grpc/metrics.go | 2 + banyand/liaison/grpc/server.go | 4 +- banyand/measure/block.go | 38 ++++++++++++++++ banyand/measure/merger_test.go | 2 +- banyand/measure/metadata.go | 6 ++- banyand/measure/query.go | 2 +- banyand/observability/service.go | 69 +++++++++++++++++++++++------ banyand/stream/metadata.go | 6 ++- pkg/cmdsetup/liaison.go | 2 +- pkg/schema/cache.go | 8 ++++ pkg/schema/metrics.go | 39 ++++++++++++++++ pkg/timestamp/scheduler.go | 31 +++++++++++++ 17 files changed, 216 insertions(+), 26 deletions(-) create mode 100644 pkg/schema/metrics.go diff --git a/CHANGES.md b/CHANGES.md index 05910c3c5..57d781eb1 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -23,7 +23,7 @@ Release Notes. - Add the topN query trace. - Introduce the round-robin selector to Liaison Node. - Optimize query performance of series index. -- Add storage metrics. +- Add liaison, remote queue, storage(rotation), time-series tables, metadata cache and scheduler metrics. ### Bugs @@ -38,6 +38,8 @@ Release Notes. - Fix a bug where a distributed query would return an empty result if the "limit" was set much lower than the "offset". - Fix duplicated measure data in a single part. - Fix several "sync.Pool" leak issues by adding a tracker to the pool. +- Fix panic when removing a expired segment. +- Fix panic when reading a disorder block of measure. This block's versions are not sorted in descending order. ### Documentation diff --git a/banyand/dquery/dquery.go b/banyand/dquery/dquery.go index d378febbe..14e9e7753 100644 --- a/banyand/dquery/dquery.go +++ b/banyand/dquery/dquery.go @@ -29,23 +29,31 @@ import ( modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" "github.com/apache/skywalking-banyandb/banyand/measure" "github.com/apache/skywalking-banyandb/banyand/metadata" + "github.com/apache/skywalking-banyandb/banyand/observability" "github.com/apache/skywalking-banyandb/banyand/queue" "github.com/apache/skywalking-banyandb/banyand/stream" "github.com/apache/skywalking-banyandb/pkg/bus" "github.com/apache/skywalking-banyandb/pkg/logger" "github.com/apache/skywalking-banyandb/pkg/query/executor" "github.com/apache/skywalking-banyandb/pkg/run" + "github.com/apache/skywalking-banyandb/pkg/schema" ) const ( moduleName = "distributed-query" ) -var _ run.Service = (*queryService)(nil) +var ( + _ run.Service = (*queryService)(nil) + distributedQueryScope = observability.RootScope.SubScope("dquery") + streamScope = distributedQueryScope.SubScope("stream") + measureScope = distributedQueryScope.SubScope("measure") +) type queryService struct { metaService metadata.Repo pipeline queue.Server + omr observability.MetricsRegistry log *logger.Logger sqp *streamQueryProcessor mqp *measureQueryProcessor @@ -55,12 +63,13 @@ type queryService struct { } // NewService return a new query service. -func NewService(metaService metadata.Repo, pipeline queue.Server, broadcaster bus.Broadcaster, +func NewService(metaService metadata.Repo, pipeline queue.Server, broadcaster bus.Broadcaster, omr observability.MetricsRegistry, ) (run.Unit, error) { svc := &queryService{ metaService: metaService, closer: run.NewCloser(1), pipeline: pipeline, + omr: omr, } svc.sqp = &streamQueryProcessor{ queryService: svc, @@ -89,8 +98,10 @@ func (q *queryService) PreRun(ctx context.Context) error { node := val.(common.Node) q.nodeID = node.NodeID q.log = logger.GetLogger(moduleName) - q.sqp.streamService = stream.NewPortableRepository(q.metaService, q.log) - q.mqp.measureService = measure.NewPortableRepository(q.metaService, q.log) + q.sqp.streamService = stream.NewPortableRepository(q.metaService, q.log, + schema.NewMetrics(q.omr.With(streamScope))) + q.mqp.measureService = measure.NewPortableRepository(q.metaService, q.log, + schema.NewMetrics(q.omr.With(measureScope))) return multierr.Combine( q.pipeline.Subscribe(data.TopicStreamQuery, q.sqp), q.pipeline.Subscribe(data.TopicMeasureQuery, q.mqp), diff --git a/banyand/internal/storage/metrics.go b/banyand/internal/storage/metrics.go index 54f9d81a7..a70033603 100644 --- a/banyand/internal/storage/metrics.go +++ b/banyand/internal/storage/metrics.go @@ -35,6 +35,8 @@ type metrics struct { totalRetentionHasData meter.Counter totalRetentionErr meter.Counter totalRetentionHasDataLatency meter.Counter + + schedulerMetrics *observability.SchedulerMetrics } func newMetrics(factory *observability.Factory) *metrics { @@ -52,6 +54,7 @@ func newMetrics(factory *observability.Factory) *metrics { totalRetentionErr: factory.NewCounter("total_retention_err"), totalRetentionHasDataLatency: factory.NewCounter("total_retention_has_data_latency"), totalRetentionHasData: factory.NewCounter("total_retention_has_data"), + schedulerMetrics: observability.NewSchedulerMetrics(factory), } } diff --git a/banyand/internal/storage/segment.go b/banyand/internal/storage/segment.go index d3e0e214c..0f7d816b9 100644 --- a/banyand/internal/storage/segment.go +++ b/banyand/internal/storage/segment.go @@ -391,9 +391,10 @@ func (sc *segmentController[T, O]) remove(deadline time.Time) (hasSegment bool, for _, s := range sc.segments() { if s.Before(deadline) { hasSegment = true + id := s.id s.delete() sc.Lock() - sc.removeSeg(s.id) + sc.removeSeg(id) sc.Unlock() sc.l.Info().Stringer("segment", s).Msg("removed a segment") } diff --git a/banyand/internal/storage/tsdb.go b/banyand/internal/storage/tsdb.go index d1539d185..f17dc7c22 100644 --- a/banyand/internal/storage/tsdb.go +++ b/banyand/internal/storage/tsdb.go @@ -167,6 +167,10 @@ func (d *database[T, O]) collect() { refCount += atomic.LoadInt32(&s.refCount) } d.totalSegRefs.Set(float64(refCount)) + metrics := d.scheduler.Metrics() + for job, m := range metrics { + d.metrics.schedulerMetrics.Collect(job, m) + } } type walkFn func(suffix string) error diff --git a/banyand/liaison/grpc/metrics.go b/banyand/liaison/grpc/metrics.go index dee5a3788..1321158f8 100644 --- a/banyand/liaison/grpc/metrics.go +++ b/banyand/liaison/grpc/metrics.go @@ -26,6 +26,7 @@ type metrics struct { totalStarted meter.Counter totalFinished meter.Counter totalErr meter.Counter + totalPanic meter.Counter totalLatency meter.Counter totalStreamStarted meter.Counter @@ -49,6 +50,7 @@ func newMetrics(factory *observability.Factory) *metrics { totalStarted: factory.NewCounter("total_started", "group", "service", "method"), totalFinished: factory.NewCounter("total_finished", "group", "service", "method"), totalErr: factory.NewCounter("total_err", "group", "service", "method"), + totalPanic: factory.NewCounter("total_panic"), totalLatency: factory.NewCounter("total_latency", "group", "service", "method"), totalStreamStarted: factory.NewCounter("total_stream_started", "service", "method"), totalStreamFinished: factory.NewCounter("total_stream_finished", "service", "method"), diff --git a/banyand/liaison/grpc/server.go b/banyand/liaison/grpc/server.go index 31efab86f..af89eb964 100644 --- a/banyand/liaison/grpc/server.go +++ b/banyand/liaison/grpc/server.go @@ -80,6 +80,7 @@ type server struct { streamSVC *streamService *streamRegistryServer *indexRuleBindingRegistryServer + metrics *metrics keyFile string certFile string accessLogRootPath string @@ -157,6 +158,7 @@ func (s *server) PreRun(_ context.Context) error { } } metrics := newMetrics(s.omr.With(liaisonGrpcScope)) + s.metrics = metrics s.streamSVC.metrics = metrics s.measureSVC.metrics = metrics s.propertyServer.metrics = metrics @@ -229,7 +231,7 @@ func (s *server) Serve() run.StopNotify { } grpcPanicRecoveryHandler := func(p any) (err error) { s.log.Error().Interface("panic", p).Str("stack", string(debug.Stack())).Msg("recovered from panic") - + s.metrics.totalPanic.Inc(1) return status.Errorf(codes.Internal, "%s", p) } diff --git a/banyand/measure/block.go b/banyand/measure/block.go index ae5006e69..b95329a18 100644 --- a/banyand/measure/block.go +++ b/banyand/measure/block.go @@ -632,6 +632,44 @@ func (bc *blockCursor) copyTo(r *model.MeasureResult, storedIndexValue map[commo } } +func (bc *blockCursor) replace(r *model.MeasureResult, storedIndexValue map[common.SeriesID]map[string]*modelv1.TagValue) { + r.SID = bc.bm.seriesID + r.Timestamps[len(r.Timestamps)-1] = bc.timestamps[bc.idx] + r.Versions[len(r.Versions)-1] = bc.versions[bc.idx] + var indexValue map[string]*modelv1.TagValue + if storedIndexValue != nil { + indexValue = storedIndexValue[r.SID] + } + for i := range r.TagFamilies { + tfName := r.TagFamilies[i].Name + var cf *columnFamily + for j := range r.TagFamilies[i].Tags { + tagName := r.TagFamilies[i].Tags[j].Name + if indexValue != nil && indexValue[tagName] != nil { + r.TagFamilies[i].Tags[j].Values[len(r.TagFamilies[i].Tags[j].Values)-1] = indexValue[tagName] + continue + } + if cf == nil { + for i := range bc.tagFamilies { + if bc.tagFamilies[i].name == tfName { + cf = &bc.tagFamilies[i] + break + } + } + } + for _, c := range cf.columns { + if c.name == tagName { + r.TagFamilies[i].Tags[j].Values[len(r.TagFamilies[i].Tags[j].Values)-1] = mustDecodeTagValue(c.valueType, c.values[bc.idx]) + break + } + } + } + } + for i, c := range bc.fields.columns { + r.Fields[i].Values[len(r.Fields[i].Values)-1] = mustDecodeFieldValue(c.valueType, c.values[bc.idx]) + } +} + func (bc *blockCursor) loadData(tmpBlock *block) bool { tmpBlock.reset() cfm := make([]columnMetadata, 0, len(bc.fieldProjection)) diff --git a/banyand/measure/merger_test.go b/banyand/measure/merger_test.go index 22c804e04..b3983407d 100644 --- a/banyand/measure/merger_test.go +++ b/banyand/measure/merger_test.go @@ -276,7 +276,7 @@ func Test_mergeParts(t *testing.T) { }, { name: "Test with multiple parts with same ts", - dpsList: []*dataPoints{dpsTS1, dpsTS1, dpsTS1}, + dpsList: []*dataPoints{dpsTS11, dpsTS1}, want: []blockMetadata{ {seriesID: 1, count: 1, uncompressedSizeBytes: 1676}, {seriesID: 2, count: 1, uncompressedSizeBytes: 55}, diff --git a/banyand/measure/metadata.go b/banyand/measure/metadata.go index 21b4076e3..83dbbc02f 100644 --- a/banyand/measure/metadata.go +++ b/banyand/measure/metadata.go @@ -39,6 +39,8 @@ import ( resourceSchema "github.com/apache/skywalking-banyandb/pkg/schema" ) +var metadataScope = measureScope.SubScope("metadata") + // SchemaService allows querying schema information. type SchemaService interface { Query @@ -58,6 +60,7 @@ func newSchemaRepo(path string, svc *service) *schemaRepo { svc.metadata, svc.l, newSupplier(path, svc), + resourceSchema.NewMetrics(svc.omr.With(metadataScope)), ), } sr.start() @@ -65,7 +68,7 @@ func newSchemaRepo(path string, svc *service) *schemaRepo { } // NewPortableRepository creates a new portable repository. -func NewPortableRepository(metadata metadata.Repo, l *logger.Logger) SchemaService { +func NewPortableRepository(metadata metadata.Repo, l *logger.Logger, metrics *resourceSchema.Metrics) SchemaService { r := &schemaRepo{ l: l, metadata: metadata, @@ -73,6 +76,7 @@ func NewPortableRepository(metadata metadata.Repo, l *logger.Logger) SchemaServi metadata, l, newPortableSupplier(metadata, l), + metrics, ), } r.start() diff --git a/banyand/measure/query.go b/banyand/measure/query.go index c9ac2117c..bf2fa1bf5 100644 --- a/banyand/measure/query.go +++ b/banyand/measure/query.go @@ -575,7 +575,7 @@ func (qr *queryResult) merge(storedIndexValue map[common.SeriesID]map[string]*mo if len(result.Timestamps) > 0 && topBC.timestamps[topBC.idx] == result.Timestamps[len(result.Timestamps)-1] { if topBC.versions[topBC.idx] > lastVersion { - logger.Panicf("following parts version should be less or equal to the previous one") + topBC.replace(result, storedIndexValue) } } else { topBC.copyTo(result, storedIndexValue, tagProjection) diff --git a/banyand/observability/service.go b/banyand/observability/service.go index eb7bac613..593f9b04e 100644 --- a/banyand/observability/service.go +++ b/banyand/observability/service.go @@ -32,6 +32,7 @@ import ( "github.com/apache/skywalking-banyandb/banyand/metadata" "github.com/apache/skywalking-banyandb/banyand/queue" "github.com/apache/skywalking-banyandb/pkg/logger" + "github.com/apache/skywalking-banyandb/pkg/meter" "github.com/apache/skywalking-banyandb/pkg/meter/native" "github.com/apache/skywalking-banyandb/pkg/run" "github.com/apache/skywalking-banyandb/pkg/timestamp" @@ -45,6 +46,8 @@ const ( var ( _ run.Service = (*metricService)(nil) _ run.Config = (*metricService)(nil) + + obScope = RootScope.SubScope("observability") ) // Service type for Metric Service. @@ -65,20 +68,21 @@ func NewMetricService(metadata metadata.Repo, pipeline queue.Client, nodeType st } type metricService struct { - metadata metadata.Repo - nodeSelector native.NodeSelector - pipeline queue.Client - scheduler *timestamp.Scheduler - l *logger.Logger - closer *run.Closer - svr *http.Server - nCollection *native.MetricCollection - promReg *prometheus.Registry - npf nativeProviderFactory - listenAddr string - nodeType string - modes []string - mutex sync.Mutex + metadata metadata.Repo + nodeSelector native.NodeSelector + pipeline queue.Client + svr *http.Server + l *logger.Logger + closer *run.Closer + scheduler *timestamp.Scheduler + nCollection *native.MetricCollection + promReg *prometheus.Registry + schedulerMetrics *SchedulerMetrics + npf nativeProviderFactory + listenAddr string + nodeType string + modes []string + mutex sync.Mutex } func (p *metricService) FlagSet() *run.FlagSet { @@ -143,8 +147,13 @@ func (p *metricService) Serve() run.StopNotify { p.initMetrics() clock, _ := timestamp.GetClock(context.TODO()) p.scheduler = timestamp.NewScheduler(p.l, clock) + p.schedulerMetrics = NewSchedulerMetrics(p.With(obScope)) err := p.scheduler.Register("metrics-collector", cron.Descriptor, "@every 15s", func(_ time.Time, _ *logger.Logger) bool { MetricsCollector.collect() + metrics := p.scheduler.Metrics() + for job, m := range metrics { + p.schedulerMetrics.Collect(job, m) + } return true }) if err != nil { @@ -207,3 +216,35 @@ func containsMode(modes []string, mode string) bool { } return false } + +// SchedulerMetrics is the metrics for scheduler. +type SchedulerMetrics struct { + totalJobsStarted meter.Gauge + totalJobsFinished meter.Gauge + totalTasksStarted meter.Gauge + totalTasksFinished meter.Gauge + totalTasksPanic meter.Gauge + totalTaskLatency meter.Gauge +} + +// NewSchedulerMetrics creates a new scheduler metrics. +func NewSchedulerMetrics(factory *Factory) *SchedulerMetrics { + return &SchedulerMetrics{ + totalJobsStarted: factory.NewGauge("scheduler_jobs_started", "job"), + totalJobsFinished: factory.NewGauge("scheduler_jobs_finished", "job"), + totalTasksStarted: factory.NewGauge("scheduler_tasks_started", "job"), + totalTasksFinished: factory.NewGauge("scheduler_tasks_finished", "job"), + totalTasksPanic: factory.NewGauge("scheduler_tasks_panic", "job"), + totalTaskLatency: factory.NewGauge("scheduler_task_latency", "job"), + } +} + +// Collect collects the scheduler metrics. +func (sm *SchedulerMetrics) Collect(job string, m *timestamp.SchedulerMetrics) { + sm.totalJobsStarted.Set(float64(m.TotalJobsStarted.Load()), job) + sm.totalJobsFinished.Set(float64(m.TotalJobsFinished.Load()), job) + sm.totalTasksStarted.Set(float64(m.TotalTasksStarted.Load()), job) + sm.totalTasksFinished.Set(float64(m.TotalTasksFinished.Load()), job) + sm.totalTasksPanic.Set(float64(m.TotalTasksPanic.Load()), job) + sm.totalTaskLatency.Set(float64(m.TotalTaskLatencyInNanoseconds.Load())/float64(time.Second), job) +} diff --git a/banyand/stream/metadata.go b/banyand/stream/metadata.go index 58300ae89..1103b3d5d 100644 --- a/banyand/stream/metadata.go +++ b/banyand/stream/metadata.go @@ -40,6 +40,8 @@ import ( resourceSchema "github.com/apache/skywalking-banyandb/pkg/schema" ) +var metadataScope = streamScope.SubScope("metadata") + // SchemaService allows querying schema information. type SchemaService interface { Query @@ -59,6 +61,7 @@ func newSchemaRepo(path string, svc *service) schemaRepo { svc.metadata, svc.l, newSupplier(path, svc), + resourceSchema.NewMetrics(svc.omr.With(metadataScope)), ), } sr.start() @@ -66,7 +69,7 @@ func newSchemaRepo(path string, svc *service) schemaRepo { } // NewPortableRepository creates a new portable repository. -func NewPortableRepository(metadata metadata.Repo, l *logger.Logger) SchemaService { +func NewPortableRepository(metadata metadata.Repo, l *logger.Logger, metrics *resourceSchema.Metrics) SchemaService { r := &schemaRepo{ l: l, metadata: metadata, @@ -74,6 +77,7 @@ func NewPortableRepository(metadata metadata.Repo, l *logger.Logger) SchemaServi metadata, l, newPortableSupplier(metadata, l), + metrics, ), } r.start() diff --git a/pkg/cmdsetup/liaison.go b/pkg/cmdsetup/liaison.go index bbb10db39..40ea283e2 100644 --- a/pkg/cmdsetup/liaison.go +++ b/pkg/cmdsetup/liaison.go @@ -52,7 +52,7 @@ func newLiaisonCmd(runners ...run.Unit) *cobra.Command { grpcServer := grpc.NewServer(ctx, pipeline, localPipeline, metaSvc, nodeRegistry, metricSvc) profSvc := observability.NewProfService() httpServer := http.NewServer() - dQuery, err := dquery.NewService(metaSvc, localPipeline, pipeline) + dQuery, err := dquery.NewService(metaSvc, localPipeline, pipeline, metricSvc) if err != nil { l.Fatal().Err(err).Msg("failed to initiate distributed query service") } diff --git a/pkg/schema/cache.go b/pkg/schema/cache.go index 5c0ff1f58..9abe21ecd 100644 --- a/pkg/schema/cache.go +++ b/pkg/schema/cache.go @@ -108,6 +108,7 @@ type schemaRepo struct { l *logger.Logger closer *run.ChannelCloser eventCh chan MetadataEvent + metrics *Metrics groupMap sync.Map resourceMap sync.Map workerNum int @@ -136,6 +137,7 @@ func NewRepository( metadata metadata.Repo, l *logger.Logger, resourceSupplier ResourceSupplier, + metrics *Metrics, ) Repository { workNum := getWorkerNum() return &schemaRepo{ @@ -146,6 +148,7 @@ func NewRepository( eventCh: make(chan MetadataEvent, workNum), workerNum: workNum, closer: run.NewChannelCloser(), + metrics: metrics, } } @@ -154,6 +157,7 @@ func NewPortableRepository( metadata metadata.Repo, l *logger.Logger, supplier ResourceSchemaSupplier, + metrics *Metrics, ) Repository { workNum := getWorkerNum() return &schemaRepo{ @@ -163,6 +167,7 @@ func NewPortableRepository( eventCh: make(chan MetadataEvent, workNum), workerNum: workNum, closer: run.NewChannelCloser(), + metrics: metrics, } } @@ -177,6 +182,7 @@ func (sr *schemaRepo) Watcher() { if err := recover(); err != nil { sr.l.Warn().Interface("err", err).Msg("watching the events") } + sr.metrics.totalPanics.Inc(1) }() for { select { @@ -224,8 +230,10 @@ func (sr *schemaRepo) Watcher() { default: } sr.l.Err(err).Interface("event", evt).Msg("fail to handle the metadata event. retry...") + sr.metrics.totalErrs.Inc(1) go func() { sr.SendMetadataEvent(evt) + sr.metrics.totalRetries.Inc(1) }() } case <-sr.closer.CloseNotify(): diff --git a/pkg/schema/metrics.go b/pkg/schema/metrics.go new file mode 100644 index 000000000..3ada77d57 --- /dev/null +++ b/pkg/schema/metrics.go @@ -0,0 +1,39 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package schema + +import ( + "github.com/apache/skywalking-banyandb/banyand/observability" + "github.com/apache/skywalking-banyandb/pkg/meter" +) + +// Metrics is a collection of metrics. +type Metrics struct { + totalErrs meter.Counter + totalRetries meter.Counter + totalPanics meter.Counter +} + +// NewMetrics creates a new Metrics. +func NewMetrics(factory *observability.Factory) *Metrics { + return &Metrics{ + totalErrs: factory.NewCounter("total_err"), + totalRetries: factory.NewCounter("total_retries"), + totalPanics: factory.NewCounter("total_panics"), + } +} diff --git a/pkg/timestamp/scheduler.go b/pkg/timestamp/scheduler.go index effadf37e..1c2394f14 100644 --- a/pkg/timestamp/scheduler.go +++ b/pkg/timestamp/scheduler.go @@ -20,6 +20,7 @@ package timestamp import ( "runtime/debug" "sync" + "sync/atomic" "time" "github.com/benbjohnson/clock" @@ -141,12 +142,24 @@ func (s *Scheduler) Close() { } } +// Metrics returns the metrics of all registered tasks. +func (s *Scheduler) Metrics() map[string]*SchedulerMetrics { + s.RLock() + defer s.RUnlock() + m := make(map[string]*SchedulerMetrics, len(s.tasks)) + for k, t := range s.tasks { + m[k] = t.metrics + } + return m +} + type task struct { clock Clock schedule cron.Schedule closer *run.Closer l *logger.Logger action SchedulerAction + metrics *SchedulerMetrics name string } @@ -158,6 +171,7 @@ func newTask(l *logger.Logger, name string, clock clock.Clock, schedule cron.Sch schedule: schedule, action: action, closer: run.NewCloser(1), + metrics: &SchedulerMetrics{}, } } @@ -165,6 +179,8 @@ func (t *task) run() { defer t.closer.Done() now := t.clock.Now() t.l.Info().Str("name", t.name).Time("now", now).Msg("start") + t.metrics.TotalJobsStarted.Add(1) + defer t.metrics.TotalJobsFinished.Add(1) for { next := t.schedule.Next(now) d := next.Sub(now) @@ -178,10 +194,15 @@ func (t *task) run() { e.Str("name", t.name).Time("now", now).Msg("wake") } if !func() (ret bool) { + t.metrics.TotalTasksStarted.Add(1) + start := time.Now() defer func() { + t.metrics.TotalTasksFinished.Add(1) + t.metrics.TotalTaskLatencyInNanoseconds.Add(time.Since(start).Nanoseconds()) if r := recover(); r != nil { t.l.Error().Str("name", t.name).Interface("panic", r).Str("stack", string(debug.Stack())).Msg("panic") ret = true + t.metrics.TotalTasksPanic.Add(1) } }() return t.action(now, t.l) @@ -200,3 +221,13 @@ func (t *task) run() { func (t *task) close() { t.closer.CloseThenWait() } + +// SchedulerMetrics collects the metrics of a Scheduler. +type SchedulerMetrics struct { + TotalJobsStarted atomic.Uint64 + TotalJobsFinished atomic.Uint64 + TotalTasksStarted atomic.Uint64 + TotalTasksFinished atomic.Uint64 + TotalTasksPanic atomic.Uint64 + TotalTaskLatencyInNanoseconds atomic.Int64 +}