From 5e785e6338829cdca9b1996e6bf708baa3dc6855 Mon Sep 17 00:00:00 2001 From: Rob Skillington Date: Fri, 7 Jun 2019 21:07:04 -0400 Subject: [PATCH] [coordinator] Add "ingest_latency" histogram metric and return datapoint too old/new errors with offending timestamps (#1716) --- .../docker-integration-tests/simple/test.sh | 9 +- src/cmd/services/m3dbnode/main/main.go | 19 ++++ src/dbnode/server/server.go | 36 ++------ src/dbnode/storage/series/buffer.go | 60 +++++++----- src/dbnode/storage/series/buffer_mock.go | 8 +- src/dbnode/storage/series/buffer_test.go | 41 +++++---- src/dbnode/storage/series/series.go | 4 +- .../api/v1/handler/prometheus/remote/write.go | 91 ++++++++++++++++--- .../handler/prometheus/remote/write_test.go | 68 +++++++++++++- src/query/api/v1/httpd/handler.go | 1 + src/query/benchmark/common/parse_json.go | 2 +- src/query/server/server.go | 43 ++++----- src/query/storage/converter.go | 16 ++-- src/query/tsdb/remote/codecs.go | 4 +- src/x/os/interrupt.go | 71 +++++++++++++++ 15 files changed, 344 insertions(+), 129 deletions(-) create mode 100644 src/x/os/interrupt.go diff --git a/scripts/docker-integration-tests/simple/test.sh b/scripts/docker-integration-tests/simple/test.sh index dce45d5ac2..75290a1a67 100755 --- a/scripts/docker-integration-tests/simple/test.sh +++ b/scripts/docker-integration-tests/simple/test.sh @@ -28,8 +28,13 @@ fi # DB initialization logic with the setup_single_m3db_node command in common.sh like the other files. Right now # we can't do that because this test doesn't use the docker-compose networking so we have to specify 127.0.0.1 # as the endpoint in the placement instead of being able to use dbnode01. -echo "Sleeping for a bit to ensure db up" -sleep 15 # TODO Replace sleeps with logic to determine when to proceed +echo "Wait for DB to be up" +ATTEMPTS=10 MAX_TIMEOUT=4 TIMEOUT=1 retry_with_backoff \ + 'curl -vvvsSf 0.0.0.0:9002/bootstrappedinplacementornoplacement' + +echo "Wait for coordinator API to be up" +ATTEMPTS=10 MAX_TIMEOUT=4 TIMEOUT=1 retry_with_backoff \ + 'curl -vvvsSf 0.0.0.0:7201/health' echo "Adding namespace" curl -vvvsSf -X POST 0.0.0.0:7201/api/v1/namespace -d '{ diff --git a/src/cmd/services/m3dbnode/main/main.go b/src/cmd/services/m3dbnode/main/main.go index a93461ea27..d246ecfe37 100644 --- a/src/cmd/services/m3dbnode/main/main.go +++ b/src/cmd/services/m3dbnode/main/main.go @@ -25,6 +25,8 @@ import ( "fmt" _ "net/http/pprof" // pprof: for debug listen server if configured "os" + "os/signal" + "syscall" clusterclient "github.com/m3db/m3/src/cluster/client" "github.com/m3db/m3/src/cmd/services/m3dbnode/config" @@ -33,6 +35,7 @@ import ( coordinatorserver "github.com/m3db/m3/src/query/server" xconfig "github.com/m3db/m3/src/x/config" "github.com/m3db/m3/src/x/etcd" + xos "github.com/m3db/m3/src/x/os" ) var ( @@ -62,11 +65,19 @@ func main() { } var ( + numComponents int dbClientCh chan client.Client clusterClientCh chan clusterclient.Client coordinatorDoneCh chan struct{} ) + if cfg.DB != nil { + numComponents++ + } + if cfg.Coordinator != nil { + numComponents++ + } + interruptCh := xos.NewInterruptChannel(numComponents) if cfg.DB != nil { dbClientCh = make(chan client.Client, 1) clusterClientCh = make(chan clusterclient.Client, 1) @@ -80,6 +91,7 @@ func main() { DBConfig: cfg.DB, DBClient: dbClientCh, ClusterClient: clusterClientCh, + InterruptCh: interruptCh, }) coordinatorDoneCh <- struct{}{} }() @@ -90,8 +102,15 @@ func main() { Config: *cfg.DB, ClientCh: dbClientCh, ClusterClientCh: clusterClientCh, + InterruptCh: interruptCh, }) } else if cfg.Coordinator != nil { <-coordinatorDoneCh } } + +func interrupt() <-chan os.Signal { + c := make(chan os.Signal) + signal.Notify(c, syscall.SIGINT, syscall.SIGTERM) + return c +} diff --git a/src/dbnode/server/server.go b/src/dbnode/server/server.go index 8d00682aef..6a4d2fc059 100644 --- a/src/dbnode/server/server.go +++ b/src/dbnode/server/server.go @@ -27,12 +27,10 @@ import ( "math" "net/http" "os" - "os/signal" "path" "runtime" "runtime/debug" "strings" - "syscall" "time" clusterclient "github.com/m3db/m3/src/cluster/client" @@ -720,13 +718,13 @@ func Run(runOpts RunOptions) { go func() { if runOpts.BootstrapCh != nil { - // Notify on bootstrap chan if specified + // Notify on bootstrap chan if specified. defer func() { runOpts.BootstrapCh <- struct{}{} }() } - // Bootstrap asynchronously so we can handle interrupt + // Bootstrap asynchronously so we can handle interrupt. if err := db.Bootstrap(); err != nil { logger.Fatal("could not bootstrap database", zap.Error(err)) } @@ -737,24 +735,12 @@ func Run(runOpts RunOptions) { runtimeOptsMgr, cfg.WriteNewSeriesLimitPerSecond) }() - // Handle interrupt - interruptCh := runOpts.InterruptCh - if interruptCh == nil { - // Make a noop chan so we can always select - interruptCh = make(chan error) - } - - var interruptErr error - select { - case err := <-interruptCh: - interruptErr = err - case sig := <-interrupt(): - interruptErr = fmt.Errorf("%v", sig) - } - - logger.Warn("interrupt", zap.Error(interruptErr)) + // Wait for process interrupt. + xos.WaitForInterrupt(logger, xos.InterruptOptions{ + InterruptCh: runOpts.InterruptCh, + }) - // Attempt graceful server close + // Attempt graceful server close. closedCh := make(chan struct{}) go func() { err := db.Terminate() @@ -764,7 +750,7 @@ func Run(runOpts RunOptions) { closedCh <- struct{}{} }() - // Wait then close or hard close + // Wait then close or hard close. closeTimeout := serverGracefulCloseTimeout select { case <-closedCh: @@ -774,12 +760,6 @@ func Run(runOpts RunOptions) { } } -func interrupt() <-chan os.Signal { - c := make(chan os.Signal) - signal.Notify(c, syscall.SIGINT, syscall.SIGTERM) - return c -} - func bgValidateProcessLimits(logger *zap.Logger) { // If unable to validate process limits on the current configuration, // do not run background validator task. diff --git a/src/dbnode/storage/series/buffer.go b/src/dbnode/storage/series/buffer.go index 02b64e64d7..66ac30d493 100644 --- a/src/dbnode/storage/series/buffer.go +++ b/src/dbnode/storage/series/buffer.go @@ -37,6 +37,7 @@ import ( "github.com/m3db/m3/src/dbnode/ts" "github.com/m3db/m3/src/dbnode/x/xio" "github.com/m3db/m3/src/x/context" + xerrors "github.com/m3db/m3/src/x/errors" "github.com/m3db/m3/src/x/ident" "github.com/m3db/m3/src/x/instrument" "github.com/m3db/m3/src/x/pool" @@ -48,6 +49,7 @@ import ( const ( errBucketMapCacheNotInSync = "bucket map keys do not match sorted keys cache" errBucketMapCacheNotInSyncFmt = errBucketMapCacheNotInSync + ", blockStart: %d" + errTimestampFormat = time.RFC822Z ) var ( @@ -135,7 +137,7 @@ type databaseBuffer interface { Bootstrap(bl block.DatabaseBlock) - Reset(opts Options) + Reset(id ident.ID, opts Options) } type bufferStats struct { @@ -203,6 +205,7 @@ func (t *OptimizedTimes) ForEach(fn func(t xtime.UnixNano)) { } type dbBuffer struct { + id ident.ID opts Options nowFn clock.NowFn @@ -236,7 +239,8 @@ func newDatabaseBuffer() databaseBuffer { return b } -func (b *dbBuffer) Reset(opts Options) { +func (b *dbBuffer) Reset(id ident.ID, opts Options) { + b.id = id b.opts = opts b.nowFn = opts.ClockOptions().NowFn() ropts := opts.RetentionOptions() @@ -250,20 +254,6 @@ func (b *dbBuffer) Reset(opts Options) { b.futureRetentionPeriod = ropts.FutureRetentionPeriod() } -// ResolveWriteType returns whether a write is a cold write or warm write. -func (b *dbBuffer) ResolveWriteType( - timestamp time.Time, - now time.Time, -) WriteType { - pastLimit := now.Add(-1 * b.bufferPast) - futureLimit := now.Add(b.bufferFuture) - if !pastLimit.Before(timestamp) || !futureLimit.After(timestamp) { - return ColdWrite - } - - return WarmWrite -} - func (b *dbBuffer) Write( ctx context.Context, timestamp time.Time, @@ -272,14 +262,42 @@ func (b *dbBuffer) Write( annotation []byte, wOpts WriteOptions, ) (bool, error) { - now := b.nowFn() - writeType := b.ResolveWriteType(timestamp, now) - - if writeType == ColdWrite { + var ( + now = b.nowFn() + pastLimit = now.Add(-1 * b.bufferPast) + futureLimit = now.Add(b.bufferFuture) + writeType WriteType + ) + switch { + case !pastLimit.Before(timestamp): + writeType = ColdWrite + if !b.coldWritesEnabled { + return false, xerrors.NewInvalidParamsError( + fmt.Errorf("datapoint too far in past: "+ + "id=%s, off_by=%s, timestamp=%s, past_limit=%s, "+ + "timestamp_unix_nanos=%d, past_limit_unix_nanos=%d", + b.id.Bytes(), pastLimit.Sub(timestamp).String(), + timestamp.Format(errTimestampFormat), + pastLimit.Format(errTimestampFormat), + timestamp.UnixNano(), pastLimit.UnixNano())) + } + case !futureLimit.After(timestamp): + writeType = ColdWrite if !b.coldWritesEnabled { - return false, m3dberrors.ErrColdWritesNotEnabled + return false, xerrors.NewInvalidParamsError( + fmt.Errorf("datapoint too far in future: "+ + "id=%s, off_by=%s, timestamp=%s, future_limit=%s, "+ + "timestamp_unix_nanos=%d, future_limit_unix_nanos=%d", + b.id.Bytes(), timestamp.Sub(futureLimit).String(), + timestamp.Format(errTimestampFormat), + futureLimit.Format(errTimestampFormat), + timestamp.UnixNano(), futureLimit.UnixNano())) } + default: + writeType = WarmWrite + } + if writeType == ColdWrite { if now.Add(-b.retentionPeriod).After(timestamp) { return false, m3dberrors.ErrTooPast } diff --git a/src/dbnode/storage/series/buffer_mock.go b/src/dbnode/storage/series/buffer_mock.go index 026c7ca8f9..31b919cdfa 100644 --- a/src/dbnode/storage/series/buffer_mock.go +++ b/src/dbnode/storage/series/buffer_mock.go @@ -234,13 +234,13 @@ func (mr *MockdatabaseBufferMockRecorder) Bootstrap(bl interface{}) *gomock.Call } // Reset mocks base method -func (m *MockdatabaseBuffer) Reset(opts Options) { +func (m *MockdatabaseBuffer) Reset(id ident.ID, opts Options) { m.ctrl.T.Helper() - m.ctrl.Call(m, "Reset", opts) + m.ctrl.Call(m, "Reset", id, opts) } // Reset indicates an expected call of Reset -func (mr *MockdatabaseBufferMockRecorder) Reset(opts interface{}) *gomock.Call { +func (mr *MockdatabaseBufferMockRecorder) Reset(id, opts interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Reset", reflect.TypeOf((*MockdatabaseBuffer)(nil).Reset), opts) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Reset", reflect.TypeOf((*MockdatabaseBuffer)(nil).Reset), id, opts) } diff --git a/src/dbnode/storage/series/buffer_test.go b/src/dbnode/storage/series/buffer_test.go index 255f7d217b..3500d7c67a 100644 --- a/src/dbnode/storage/series/buffer_test.go +++ b/src/dbnode/storage/series/buffer_test.go @@ -23,6 +23,7 @@ package series import ( "io" "sort" + "strings" "testing" "time" @@ -91,7 +92,7 @@ func TestBufferWriteTooFuture(t *testing.T) { return curr })) buffer := newDatabaseBuffer().(*dbBuffer) - buffer.Reset(opts) + buffer.Reset(ident.StringID("foo"), opts) ctx := context.NewContext() defer ctx.Close() @@ -100,6 +101,10 @@ func TestBufferWriteTooFuture(t *testing.T) { assert.False(t, wasWritten) assert.Error(t, err) assert.True(t, xerrors.IsInvalidParams(err)) + assert.True(t, strings.Contains(err.Error(), "datapoint too far in future")) + assert.True(t, strings.Contains(err.Error(), "id=foo")) + assert.True(t, strings.Contains(err.Error(), "timestamp=")) + assert.True(t, strings.Contains(err.Error(), "future_limit=")) } func TestBufferWriteTooPast(t *testing.T) { @@ -110,7 +115,7 @@ func TestBufferWriteTooPast(t *testing.T) { return curr })) buffer := newDatabaseBuffer().(*dbBuffer) - buffer.Reset(opts) + buffer.Reset(ident.StringID("foo"), opts) ctx := context.NewContext() defer ctx.Close() wasWritten, err := buffer.Write(ctx, curr.Add(-1*rops.BufferPast()), 1, xtime.Second, @@ -118,6 +123,10 @@ func TestBufferWriteTooPast(t *testing.T) { assert.False(t, wasWritten) assert.Error(t, err) assert.True(t, xerrors.IsInvalidParams(err)) + assert.True(t, strings.Contains(err.Error(), "datapoint too far in past")) + assert.True(t, strings.Contains(err.Error(), "id=foo")) + assert.True(t, strings.Contains(err.Error(), "timestamp=")) + assert.True(t, strings.Contains(err.Error(), "past_limit=")) } func TestBufferWriteError(t *testing.T) { @@ -131,7 +140,7 @@ func TestBufferWriteError(t *testing.T) { opts = opts.SetClockOptions(opts.ClockOptions().SetNowFn(func() time.Time { return curr })) - buffer.Reset(opts) + buffer.Reset(ident.StringID("foo"), opts) defer ctx.Close() timeUnitNotExist := xtime.Unit(127) @@ -152,7 +161,7 @@ func testBufferWriteRead(t *testing.T, opts Options, setAnn setAnnotation) { return curr })) buffer := newDatabaseBuffer().(*dbBuffer) - buffer.Reset(opts) + buffer.Reset(ident.StringID("foo"), opts) data := []value{ {curr.Add(secs(1)), 1, xtime.Second, nil}, @@ -188,7 +197,7 @@ func TestBufferReadOnlyMatchingBuckets(t *testing.T) { return curr })) buffer := newDatabaseBuffer().(*dbBuffer) - buffer.Reset(opts) + buffer.Reset(ident.StringID("foo"), opts) data := []value{ {curr.Add(mins(1)), 1, xtime.Second, nil}, @@ -228,7 +237,7 @@ func TestBufferWriteOutOfOrder(t *testing.T) { return curr })) buffer := newDatabaseBuffer().(*dbBuffer) - buffer.Reset(opts) + buffer.Reset(ident.StringID("foo"), opts) data := []value{ {curr, 1, xtime.Second, nil}, @@ -367,7 +376,7 @@ func newTestBufferWithCustomData( setAnn setAnnotation, ) (*dbBuffer, map[xtime.UnixNano][]value) { buffer := newDatabaseBuffer().(*dbBuffer) - buffer.Reset(opts) + buffer.Reset(ident.StringID("foo"), opts) expectedMap := make(map[xtime.UnixNano][]value) for _, bd := range blockDatas { @@ -587,7 +596,7 @@ func TestIndexedBufferWriteOnlyWritesSinglePoint(t *testing.T) { return curr })) buffer := newDatabaseBuffer().(*dbBuffer) - buffer.Reset(opts) + buffer.Reset(ident.StringID("foo"), opts) data := []value{ {curr.Add(secs(1)), 1, xtime.Second, nil}, @@ -638,7 +647,7 @@ func testBufferFetchBlocks(t *testing.T, opts Options, setAnn setAnnotation) { defer ctx.Close() buffer := newDatabaseBuffer().(*dbBuffer) - buffer.Reset(opts) + buffer.Reset(ident.StringID("foo"), opts) buffer.bucketsMap[xtime.ToUnixNano(b.start)] = b nsCtx := namespace.Context{} @@ -720,7 +729,7 @@ func TestBufferFetchBlocksOneResultPerBlock(t *testing.T) { defer ctx.Close() buffer := newDatabaseBuffer().(*dbBuffer) - buffer.Reset(opts) + buffer.Reset(ident.StringID("foo"), opts) buffer.bucketsMap[xtime.ToUnixNano(b.start)] = b res := buffer.FetchBlocks(ctx, []time.Time{b.start, b.start.Add(time.Second)}, namespace.Context{}) @@ -744,7 +753,7 @@ func TestBufferFetchBlocksMetadata(t *testing.T) { end := b.start.Add(time.Second) buffer := newDatabaseBuffer().(*dbBuffer) - buffer.Reset(opts) + buffer.Reset(ident.StringID("foo"), opts) buffer.bucketsMap[xtime.ToUnixNano(b.start)] = b buffer.inOrderBlockStarts = append(buffer.inOrderBlockStarts, b.start) @@ -780,7 +789,7 @@ func TestBufferTickReordersOutOfOrderBuffers(t *testing.T) { return curr })) buffer := newDatabaseBuffer().(*dbBuffer) - buffer.Reset(opts) + buffer.Reset(ident.StringID("foo"), opts) // Perform out of order writes that will create two in order encoders. data := []value{ @@ -867,7 +876,7 @@ func TestBufferRemoveBucket(t *testing.T) { return curr })) buffer := newDatabaseBuffer().(*dbBuffer) - buffer.Reset(opts) + buffer.Reset(ident.StringID("foo"), opts) // Perform out of order writes that will create two in order encoders. data := []value{ @@ -958,7 +967,7 @@ func testBufferWithEmptyEncoder(t *testing.T, testSnapshot bool) { opts = opts.SetClockOptions(opts.ClockOptions().SetNowFn(func() time.Time { return curr })) - buffer.Reset(opts) + buffer.Reset(ident.StringID("foo"), opts) // Perform one valid write to setup the state of the buffer. ctx := context.NewContext() @@ -1023,7 +1032,7 @@ func testBufferSnapshot(t *testing.T, opts Options, setAnn setAnnotation) { opts = opts.SetClockOptions(opts.ClockOptions().SetNowFn(func() time.Time { return curr })) - buffer.Reset(opts) + buffer.Reset(ident.StringID("foo"), opts) // Create test data to perform out of order writes that will create two in-order // encoders so we can verify that Snapshot will perform a merge. @@ -1125,7 +1134,7 @@ func TestBufferSnapshotWithColdWrites(t *testing.T) { opts = opts.SetClockOptions(opts.ClockOptions().SetNowFn(func() time.Time { return curr })) - buffer.Reset(opts) + buffer.Reset(ident.StringID("foo"), opts) // Create test data to perform warm writes that will create two in-order // encoders so we can verify that Snapshot will perform a merge. diff --git a/src/dbnode/storage/series/series.go b/src/dbnode/storage/series/series.go index 571a2b39af..f8a38a7bc4 100644 --- a/src/dbnode/storage/series/series.go +++ b/src/dbnode/storage/series/series.go @@ -594,7 +594,7 @@ func (s *dbSeries) Close() { // Reset (not close) underlying resources because the series will go // back into the pool and be re-used. - s.buffer.Reset(s.opts) + s.buffer.Reset(nil, s.opts) s.cachedBlocks.Reset() if s.pool != nil { @@ -632,7 +632,7 @@ func (s *dbSeries) Reset( s.tags = tags s.cachedBlocks.Reset() - s.buffer.Reset(opts) + s.buffer.Reset(id, opts) s.opts = opts s.bs = bootstrapNotStarted s.blockRetriever = blockRetriever diff --git a/src/query/api/v1/handler/prometheus/remote/write.go b/src/query/api/v1/handler/prometheus/remote/write.go index 6514063653..0d02b717ed 100644 --- a/src/query/api/v1/handler/prometheus/remote/write.go +++ b/src/query/api/v1/handler/prometheus/remote/write.go @@ -25,6 +25,7 @@ import ( "errors" "fmt" "net/http" + "time" "github.com/m3db/m3/src/cmd/services/m3coordinator/ingest" "github.com/m3db/m3/src/dbnode/client" @@ -35,6 +36,7 @@ import ( "github.com/m3db/m3/src/query/storage" "github.com/m3db/m3/src/query/ts" "github.com/m3db/m3/src/query/util/logging" + "github.com/m3db/m3/src/x/clock" xerrors "github.com/m3db/m3/src/x/errors" xhttp "github.com/m3db/m3/src/x/net/http" xtime "github.com/m3db/m3/src/x/time" @@ -59,50 +61,111 @@ var ( // PromWriteHandler represents a handler for prometheus write endpoint. type PromWriteHandler struct { downsamplerAndWriter ingest.DownsamplerAndWriter - promWriteMetrics promWriteMetrics tagOptions models.TagOptions + nowFn clock.NowFn + metrics promWriteMetrics } // NewPromWriteHandler returns a new instance of handler. func NewPromWriteHandler( downsamplerAndWriter ingest.DownsamplerAndWriter, tagOptions models.TagOptions, + nowFn clock.NowFn, scope tally.Scope, ) (http.Handler, error) { if downsamplerAndWriter == nil { return nil, errNoDownsamplerAndWriter } + metrics, err := newPromWriteMetrics(scope) + if err != nil { + return nil, err + } + return &PromWriteHandler{ downsamplerAndWriter: downsamplerAndWriter, - promWriteMetrics: newPromWriteMetrics(scope), tagOptions: tagOptions, + nowFn: nowFn, + metrics: metrics, }, nil } type promWriteMetrics struct { - writeSuccess tally.Counter - writeErrorsServer tally.Counter - writeErrorsClient tally.Counter + writeSuccess tally.Counter + writeErrorsServer tally.Counter + writeErrorsClient tally.Counter + ingestLatency tally.Histogram + ingestLatencyBuckets tally.DurationBuckets } -func newPromWriteMetrics(scope tally.Scope) promWriteMetrics { - return promWriteMetrics{ - writeSuccess: scope.Counter("write.success"), - writeErrorsServer: scope.Tagged(map[string]string{"code": "5XX"}).Counter("write.errors"), - writeErrorsClient: scope.Tagged(map[string]string{"code": "4XX"}).Counter("write.errors"), +func newPromWriteMetrics(scope tally.Scope) (promWriteMetrics, error) { + upTo1sBuckets, err := tally.LinearDurationBuckets(0, 100*time.Millisecond, 10) + if err != nil { + return promWriteMetrics{}, err + } + + upTo10sBuckets, err := tally.LinearDurationBuckets(time.Second, 500*time.Millisecond, 18) + if err != nil { + return promWriteMetrics{}, err + } + + upTo60sBuckets, err := tally.LinearDurationBuckets(10*time.Second, 5*time.Second, 11) + if err != nil { + return promWriteMetrics{}, err + } + + upTo60mBuckets, err := tally.LinearDurationBuckets(0, 5*time.Minute, 12) + if err != nil { + return promWriteMetrics{}, err + } + upTo60mBuckets = upTo60mBuckets[1:] // Remove the first 0s to get 5 min aligned buckets + + upTo6hBuckets, err := tally.LinearDurationBuckets(time.Hour, 30*time.Minute, 12) + if err != nil { + return promWriteMetrics{}, err + } + + upTo24hBuckets, err := tally.LinearDurationBuckets(6*time.Hour, time.Hour, 19) + if err != nil { + return promWriteMetrics{}, err } + upTo24hBuckets = upTo24hBuckets[1:] // Remove the first 6h to get 1 hour aligned buckets + + var ingestLatencyBuckets tally.DurationBuckets + ingestLatencyBuckets = append(ingestLatencyBuckets, upTo1sBuckets...) + ingestLatencyBuckets = append(ingestLatencyBuckets, upTo10sBuckets...) + ingestLatencyBuckets = append(ingestLatencyBuckets, upTo60sBuckets...) + ingestLatencyBuckets = append(ingestLatencyBuckets, upTo60mBuckets...) + ingestLatencyBuckets = append(ingestLatencyBuckets, upTo6hBuckets...) + ingestLatencyBuckets = append(ingestLatencyBuckets, upTo24hBuckets...) + return promWriteMetrics{ + writeSuccess: scope.SubScope("write").Counter("success"), + writeErrorsServer: scope.SubScope("write").Tagged(map[string]string{"code": "5XX"}).Counter("errors"), + writeErrorsClient: scope.SubScope("write").Tagged(map[string]string{"code": "4XX"}).Counter("errors"), + ingestLatency: scope.SubScope("ingest").Histogram("latency", ingestLatencyBuckets), + ingestLatencyBuckets: ingestLatencyBuckets, + }, nil } func (h *PromWriteHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { req, rErr := h.parseRequest(r) if rErr != nil { - h.promWriteMetrics.writeErrorsClient.Inc(1) + h.metrics.writeErrorsClient.Inc(1) xhttp.Error(w, rErr.Inner(), rErr.Code()) return } batchErr := h.write(r.Context(), req) + + // Record ingestion delay latency + now := h.nowFn() + for _, series := range req.Timeseries { + for _, sample := range series.Samples { + age := now.Sub(storage.PromTimestampToTime(sample.Timestamp)) + h.metrics.ingestLatency.RecordDuration(age) + } + } + if batchErr != nil { var ( errs = batchErr.Errors() @@ -129,10 +192,10 @@ func (h *PromWriteHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { switch { case numBadRequest == len(errs): status = http.StatusBadRequest - h.promWriteMetrics.writeErrorsClient.Inc(1) + h.metrics.writeErrorsClient.Inc(1) default: status = http.StatusInternalServerError - h.promWriteMetrics.writeErrorsServer.Inc(1) + h.metrics.writeErrorsServer.Inc(1) } logger := logging.WithContext(r.Context()) @@ -161,7 +224,7 @@ func (h *PromWriteHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { return } - h.promWriteMetrics.writeSuccess.Inc(1) + h.metrics.writeSuccess.Inc(1) } func (h *PromWriteHandler) parseRequest(r *http.Request) (*prompb.WriteRequest, *xhttp.ParseError) { diff --git a/src/query/api/v1/handler/prometheus/remote/write_test.go b/src/query/api/v1/handler/prometheus/remote/write_test.go index 3a0c1f56fc..08fd83cbc4 100644 --- a/src/query/api/v1/handler/prometheus/remote/write_test.go +++ b/src/query/api/v1/handler/prometheus/remote/write_test.go @@ -24,7 +24,9 @@ import ( "bytes" "context" "errors" + "fmt" "io/ioutil" + "math" "net/http" "net/http/httptest" "testing" @@ -50,7 +52,7 @@ func TestPromWriteParsing(t *testing.T) { promReq := test.GeneratePromWriteRequest() promReqBody := test.GeneratePromWriteRequestBody(t, promReq) - req, _ := http.NewRequest("POST", PromWriteURL, promReqBody) + req := httptest.NewRequest("POST", PromWriteURL, promReqBody) r, err := promWrite.parseRequest(req) require.Nil(t, err, "unable to parse request") @@ -68,7 +70,7 @@ func TestPromWrite(t *testing.T) { promReq := test.GeneratePromWriteRequest() promReqBody := test.GeneratePromWriteRequestBody(t, promReq) - req, _ := http.NewRequest("POST", PromWriteURL, promReqBody) + req := httptest.NewRequest("POST", PromWriteURL, promReqBody) r, err := promWrite.parseRequest(req) require.Nil(t, err, "unable to parse request") @@ -90,7 +92,7 @@ func TestPromWriteError(t *testing.T) { Return(batchErr) promWrite, err := NewPromWriteHandler(mockDownsamplerAndWriter, - models.NewTagOptions(), tally.NoopScope) + models.NewTagOptions(), time.Now, tally.NoopScope) require.NoError(t, err) promReq := test.GeneratePromWriteRequest() @@ -118,11 +120,13 @@ func TestWriteErrorMetricCount(t *testing.T) { reporter := xmetrics.NewTestStatsReporter(xmetrics.NewTestStatsReporterOptions()) scope, closer := tally.NewRootScope(tally.ScopeOptions{Reporter: reporter}, time.Millisecond) defer closer.Close() - writeMetrics := newPromWriteMetrics(scope) + writeMetrics, err := newPromWriteMetrics(scope) + require.NoError(t, err) promWrite := &PromWriteHandler{ downsamplerAndWriter: mockDownsamplerAndWriter, - promWriteMetrics: writeMetrics, + nowFn: time.Now, + metrics: writeMetrics, } req, _ := http.NewRequest("POST", PromWriteURL, nil) promWrite.ServeHTTP(httptest.NewRecorder(), req) @@ -133,3 +137,57 @@ func TestWriteErrorMetricCount(t *testing.T) { }, 5*time.Second) require.True(t, foundMetric) } + +func TestWriteDatapointDelayMetric(t *testing.T) { + logging.InitWithCores(nil) + + ctrl := gomock.NewController(t) + mockDownsamplerAndWriter := ingest.NewMockDownsamplerAndWriter(ctrl) + mockDownsamplerAndWriter.EXPECT().WriteBatch(gomock.Any(), gomock.Any()) + + scope := tally.NewTestScope("", map[string]string{"test": "delay-metric-test"}) + + handler, err := NewPromWriteHandler(mockDownsamplerAndWriter, + models.NewTagOptions(), time.Now, scope) + require.NoError(t, err) + + writeHandler, ok := handler.(*PromWriteHandler) + require.True(t, ok) + + buckets := writeHandler.metrics.ingestLatencyBuckets + + // NB(r): Bucket length is tested just to sanity check how many buckets we are creating + require.Equal(t, 80, len(buckets.AsDurations())) + + // NB(r): Bucket values are tested to sanity check they look right + expected := "[0s 100ms 200ms 300ms 400ms 500ms 600ms 700ms 800ms 900ms 1s 1.5s 2s 2.5s 3s 3.5s 4s 4.5s 5s 5.5s 6s 6.5s 7s 7.5s 8s 8.5s 9s 9.5s 10s 15s 20s 25s 30s 35s 40s 45s 50s 55s 1m0s 5m0s 10m0s 15m0s 20m0s 25m0s 30m0s 35m0s 40m0s 45m0s 50m0s 55m0s 1h0m0s 1h30m0s 2h0m0s 2h30m0s 3h0m0s 3h30m0s 4h0m0s 4h30m0s 5h0m0s 5h30m0s 6h0m0s 6h30m0s 7h0m0s 8h0m0s 9h0m0s 10h0m0s 11h0m0s 12h0m0s 13h0m0s 14h0m0s 15h0m0s 16h0m0s 17h0m0s 18h0m0s 19h0m0s 20h0m0s 21h0m0s 22h0m0s 23h0m0s 24h0m0s]" + actual := fmt.Sprintf("%v", buckets.AsDurations()) + require.Equal(t, expected, actual) + + // Ensure buckets increasing in order + lastValue := time.Duration(math.MinInt64) + for _, value := range buckets.AsDurations() { + require.True(t, value > lastValue, + fmt.Sprintf("%s must be greater than last bucket value %s", value, lastValue)) + lastValue = value + } + + promReq := test.GeneratePromWriteRequest() + promReqBody := test.GeneratePromWriteRequestBody(t, promReq) + req := httptest.NewRequest("POST", PromWriteURL, promReqBody) + handler.ServeHTTP(httptest.NewRecorder(), req) + + foundMetric := xclock.WaitUntil(func() bool { + values, found := scope.Snapshot().Histograms()["ingest.latency+test=delay-metric-test"] + if !found { + return false + } + for _, valuesInBucket := range values.Durations() { + if valuesInBucket > 0 { + return true + } + } + return false + }, 5*time.Second) + require.True(t, foundMetric) +} diff --git a/src/query/api/v1/httpd/handler.go b/src/query/api/v1/httpd/handler.go index 0eebc60836..95008d031c 100644 --- a/src/query/api/v1/httpd/handler.go +++ b/src/query/api/v1/httpd/handler.go @@ -180,6 +180,7 @@ func (h *Handler) RegisterRoutes() error { promRemoteWriteHandler, err := remote.NewPromWriteHandler( h.downsamplerAndWriter, h.tagOptions, + nowFn, h.scope.Tagged(remoteSource)) if err != nil { return err diff --git a/src/query/benchmark/common/parse_json.go b/src/query/benchmark/common/parse_json.go index 9cf1a540c6..ae91017e9c 100644 --- a/src/query/benchmark/common/parse_json.go +++ b/src/query/benchmark/common/parse_json.go @@ -144,7 +144,7 @@ func unmarshalMetrics(dataChannel <-chan []byte, metricChannel chan<- *M3Metric) log.Fatalf("failed to unmarshal metrics, got error: %v\n", err) } - metricChannel <- &M3Metric{ID: id(m.Tags, m.Name), Time: storage.TimestampToTime(m.Time), Value: m.Value} + metricChannel <- &M3Metric{ID: id(m.Tags, m.Name), Time: storage.PromTimestampToTime(m.Time), Value: m.Value} } } diff --git a/src/query/server/server.go b/src/query/server/server.go index dd26adbf06..e51282f10f 100644 --- a/src/query/server/server.go +++ b/src/query/server/server.go @@ -27,10 +27,8 @@ import ( "net" "net/http" "os" - "os/signal" "sort" "strings" - "syscall" "time" clusterclient "github.com/m3db/m3/src/cluster/client" @@ -60,6 +58,7 @@ import ( "github.com/m3db/m3/src/x/clock" xconfig "github.com/m3db/m3/src/x/config" "github.com/m3db/m3/src/x/instrument" + xos "github.com/m3db/m3/src/x/os" "github.com/m3db/m3/src/x/pool" "github.com/m3db/m3/src/x/serialize" xserver "github.com/m3db/m3/src/x/server" @@ -166,6 +165,9 @@ func Run(runOpts RunOptions) { // Close metrics scope defer func() { + if e := recover(); e != nil { + logger.Warn("recovered from panic", zap.String("e", fmt.Sprintf("%v", e))) + } logger.Info("closing metrics scope") if err := closer.Close(); err != nil { logger.Error("unable to close metrics scope", zap.Error(err)) @@ -339,31 +341,17 @@ func Run(runOpts RunOptions) { } if cfg.Carbon != nil && cfg.Carbon.Ingester != nil { - startCarbonIngestion( - cfg.Carbon, instrumentOptions, logger, m3dbClusters, downsamplerAndWriter) - } - - var interruptCh <-chan error = make(chan error) - if runOpts.InterruptCh != nil { - interruptCh = runOpts.InterruptCh - } - - var interruptErr error - if runOpts.DBConfig != nil { - interruptErr = <-interruptCh - } else { - // Only use this if running standalone, as otherwise it will - // obfuscate signal channel for the db - sigChan := make(chan os.Signal, 1) - signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM) - select { - case sig := <-sigChan: - interruptErr = fmt.Errorf("%v", sig) - case interruptErr = <-interruptCh: + server, ok := startCarbonIngestion(cfg.Carbon, instrumentOptions, + logger, m3dbClusters, downsamplerAndWriter) + if ok { + defer server.Close() } } - logger.Info("interrupt", zap.String("cause", interruptErr.Error())) + // Wait for process interrupt. + xos.WaitForInterrupt(logger, xos.InterruptOptions{ + InterruptCh: runOpts.InterruptCh, + }) } // make connections to the m3db cluster(s) and generate sessions for those clusters along with the storage @@ -850,7 +838,7 @@ func startCarbonIngestion( logger *zap.Logger, m3dbClusters m3.Clusters, downsamplerAndWriter ingest.DownsamplerAndWriter, -) { +) (xserver.Server, bool) { ingesterCfg := cfg.Ingester logger.Info("carbon ingestion enabled, configuring ingester") @@ -930,7 +918,7 @@ func startCarbonIngestion( if len(rules.Rules) == 0 { logger.Warn("no carbon ingestion rules were provided and no aggregated M3DB namespaces exist, carbon metrics will not be ingested") - return + return nil, false } if len(ingesterCfg.Rules) == 0 { @@ -964,7 +952,10 @@ func startCarbonIngestion( logger.Fatal("unable to start carbon ingestion server at listen address", zap.String("listenAddress", carbonListenAddress), zap.Error(err)) } + logger.Info("started carbon ingestion server", zap.String("listenAddress", carbonListenAddress)) + + return carbonServer, true } func newDownsamplerAndWriter(storage storage.Storage, downsampler downsample.Downsampler) (ingest.DownsamplerAndWriter, error) { diff --git a/src/query/storage/converter.go b/src/query/storage/converter.go index 53c11baccd..5e65c37b5f 100644 --- a/src/query/storage/converter.go +++ b/src/query/storage/converter.go @@ -95,7 +95,7 @@ func PromLabelsToM3Tags( func PromSamplesToM3Datapoints(samples []*prompb.Sample) ts.Datapoints { datapoints := make(ts.Datapoints, 0, len(samples)) for _, sample := range samples { - timestamp := TimestampToTime(sample.Timestamp) + timestamp := PromTimestampToTime(sample.Timestamp) datapoints = append(datapoints, ts.Datapoint{Timestamp: timestamp, Value: sample.Value}) } @@ -111,8 +111,8 @@ func PromReadQueryToM3(query *prompb.Query) (*FetchQuery, error) { return &FetchQuery{ TagMatchers: tagMatchers, - Start: TimestampToTime(query.StartTimestampMs), - End: TimestampToTime(query.EndTimestampMs), + Start: PromTimestampToTime(query.StartTimestampMs), + End: PromTimestampToTime(query.EndTimestampMs), }, nil } @@ -157,13 +157,13 @@ func PromTypeToM3(labelType prompb.LabelMatcher_Type) (models.MatchType, error) } } -// TimestampToTime converts a prometheus timestamp to time.Time -func TimestampToTime(timestampMS int64) time.Time { +// PromTimestampToTime converts a prometheus timestamp to time.Time. +func PromTimestampToTime(timestampMS int64) time.Time { return time.Unix(0, timestampMS*int64(time.Millisecond)) } -// TimeToTimestamp converts a time.Time to prometheus timestamp -func TimeToTimestamp(timestamp time.Time) int64 { +// TimeToPromTimestamp converts a time.Time to prometheus timestamp. +func TimeToPromTimestamp(timestamp time.Time) int64 { // Significantly faster than time.Truncate() return timestamp.UnixNano() / int64(time.Millisecond) } @@ -253,7 +253,7 @@ func SeriesToPromSamples(series *ts.Series) []*prompb.Sample { ) for _, dp := range datapoints { samples = append(samples, prompb.Sample{ - Timestamp: TimeToTimestamp(dp.Timestamp), + Timestamp: TimeToPromTimestamp(dp.Timestamp), Value: dp.Value, }) } diff --git a/src/query/tsdb/remote/codecs.go b/src/query/tsdb/remote/codecs.go index af6028f93b..b348f2a21c 100644 --- a/src/query/tsdb/remote/codecs.go +++ b/src/query/tsdb/remote/codecs.go @@ -40,11 +40,11 @@ import ( const reqIDKey = "reqid" func fromTime(t time.Time) int64 { - return storage.TimeToTimestamp(t) + return storage.TimeToPromTimestamp(t) } func toTime(t int64) time.Time { - return storage.TimestampToTime(t) + return storage.PromTimestampToTime(t) } func encodeTags(tags models.Tags) []*rpc.Tag { diff --git a/src/x/os/interrupt.go b/src/x/os/interrupt.go new file mode 100644 index 0000000000..d09b882407 --- /dev/null +++ b/src/x/os/interrupt.go @@ -0,0 +1,71 @@ +// Copyright (c) 2019 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package xos + +import ( + "fmt" + "os" + "os/signal" + "syscall" + + "go.uber.org/zap" +) + +// InterruptOptions are options to use when waiting for an interrupt. +type InterruptOptions struct { + // InterruptChannel is an existing interrupt channel, if none + // specified one will be created. + InterruptCh <-chan error +} + +// WaitForInterrupt will wait for an interrupt to occur and return when done. +func WaitForInterrupt(logger *zap.Logger, opts InterruptOptions) { + // Handle interrupts. + interruptCh := opts.InterruptCh + if interruptCh == nil { + // Need to catch our own interrupts. + interruptCh = NewInterruptChannel(1) + logger.Info("registered new interrupt handler") + } else { + logger.Info("using registered interrupt handler") + } + + logger.Warn("interrupt", zap.Error(<-interruptCh)) +} + +// NewInterruptChannel will return an interrupt channel useful with multiple +// listeners. +func NewInterruptChannel(numListeners int) <-chan error { + interruptCh := make(chan error, numListeners) + go func() { + err := fmt.Errorf("%v", <-interrupt()) + for i := 0; i < numListeners; i++ { + interruptCh <- err + } + }() + return interruptCh +} + +func interrupt() <-chan os.Signal { + c := make(chan os.Signal) + signal.Notify(c, syscall.SIGINT, syscall.SIGTERM) + return c +}