Skip to content

Commit

Permalink
[coordinator] Add "ingest_latency" histogram metric and return datapo…
Browse files Browse the repository at this point in the history
…int too old/new errors with offending timestamps (#1716)
  • Loading branch information
robskillington authored Jun 8, 2019
1 parent 78d138a commit 5e785e6
Show file tree
Hide file tree
Showing 15 changed files with 344 additions and 129 deletions.
9 changes: 7 additions & 2 deletions scripts/docker-integration-tests/simple/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,13 @@ fi
# DB initialization logic with the setup_single_m3db_node command in common.sh like the other files. Right now
# we can't do that because this test doesn't use the docker-compose networking so we have to specify 127.0.0.1
# as the endpoint in the placement instead of being able to use dbnode01.
echo "Sleeping for a bit to ensure db up"
sleep 15 # TODO Replace sleeps with logic to determine when to proceed
echo "Wait for DB to be up"
ATTEMPTS=10 MAX_TIMEOUT=4 TIMEOUT=1 retry_with_backoff \
'curl -vvvsSf 0.0.0.0:9002/bootstrappedinplacementornoplacement'

echo "Wait for coordinator API to be up"
ATTEMPTS=10 MAX_TIMEOUT=4 TIMEOUT=1 retry_with_backoff \
'curl -vvvsSf 0.0.0.0:7201/health'

echo "Adding namespace"
curl -vvvsSf -X POST 0.0.0.0:7201/api/v1/namespace -d '{
Expand Down
19 changes: 19 additions & 0 deletions src/cmd/services/m3dbnode/main/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import (
"fmt"
_ "net/http/pprof" // pprof: for debug listen server if configured
"os"
"os/signal"
"syscall"

clusterclient "github.com/m3db/m3/src/cluster/client"
"github.com/m3db/m3/src/cmd/services/m3dbnode/config"
Expand All @@ -33,6 +35,7 @@ import (
coordinatorserver "github.com/m3db/m3/src/query/server"
xconfig "github.com/m3db/m3/src/x/config"
"github.com/m3db/m3/src/x/etcd"
xos "github.com/m3db/m3/src/x/os"
)

var (
Expand Down Expand Up @@ -62,11 +65,19 @@ func main() {
}

var (
numComponents int
dbClientCh chan client.Client
clusterClientCh chan clusterclient.Client
coordinatorDoneCh chan struct{}
)
if cfg.DB != nil {
numComponents++
}
if cfg.Coordinator != nil {
numComponents++
}

interruptCh := xos.NewInterruptChannel(numComponents)
if cfg.DB != nil {
dbClientCh = make(chan client.Client, 1)
clusterClientCh = make(chan clusterclient.Client, 1)
Expand All @@ -80,6 +91,7 @@ func main() {
DBConfig: cfg.DB,
DBClient: dbClientCh,
ClusterClient: clusterClientCh,
InterruptCh: interruptCh,
})
coordinatorDoneCh <- struct{}{}
}()
Expand All @@ -90,8 +102,15 @@ func main() {
Config: *cfg.DB,
ClientCh: dbClientCh,
ClusterClientCh: clusterClientCh,
InterruptCh: interruptCh,
})
} else if cfg.Coordinator != nil {
<-coordinatorDoneCh
}
}

func interrupt() <-chan os.Signal {
c := make(chan os.Signal)
signal.Notify(c, syscall.SIGINT, syscall.SIGTERM)
return c
}
36 changes: 8 additions & 28 deletions src/dbnode/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,10 @@ import (
"math"
"net/http"
"os"
"os/signal"
"path"
"runtime"
"runtime/debug"
"strings"
"syscall"
"time"

clusterclient "github.com/m3db/m3/src/cluster/client"
Expand Down Expand Up @@ -720,13 +718,13 @@ func Run(runOpts RunOptions) {

go func() {
if runOpts.BootstrapCh != nil {
// Notify on bootstrap chan if specified
// Notify on bootstrap chan if specified.
defer func() {
runOpts.BootstrapCh <- struct{}{}
}()
}

// Bootstrap asynchronously so we can handle interrupt
// Bootstrap asynchronously so we can handle interrupt.
if err := db.Bootstrap(); err != nil {
logger.Fatal("could not bootstrap database", zap.Error(err))
}
Expand All @@ -737,24 +735,12 @@ func Run(runOpts RunOptions) {
runtimeOptsMgr, cfg.WriteNewSeriesLimitPerSecond)
}()

// Handle interrupt
interruptCh := runOpts.InterruptCh
if interruptCh == nil {
// Make a noop chan so we can always select
interruptCh = make(chan error)
}

var interruptErr error
select {
case err := <-interruptCh:
interruptErr = err
case sig := <-interrupt():
interruptErr = fmt.Errorf("%v", sig)
}

logger.Warn("interrupt", zap.Error(interruptErr))
// Wait for process interrupt.
xos.WaitForInterrupt(logger, xos.InterruptOptions{
InterruptCh: runOpts.InterruptCh,
})

// Attempt graceful server close
// Attempt graceful server close.
closedCh := make(chan struct{})
go func() {
err := db.Terminate()
Expand All @@ -764,7 +750,7 @@ func Run(runOpts RunOptions) {
closedCh <- struct{}{}
}()

// Wait then close or hard close
// Wait then close or hard close.
closeTimeout := serverGracefulCloseTimeout
select {
case <-closedCh:
Expand All @@ -774,12 +760,6 @@ func Run(runOpts RunOptions) {
}
}

func interrupt() <-chan os.Signal {
c := make(chan os.Signal)
signal.Notify(c, syscall.SIGINT, syscall.SIGTERM)
return c
}

func bgValidateProcessLimits(logger *zap.Logger) {
// If unable to validate process limits on the current configuration,
// do not run background validator task.
Expand Down
60 changes: 39 additions & 21 deletions src/dbnode/storage/series/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"github.com/m3db/m3/src/dbnode/ts"
"github.com/m3db/m3/src/dbnode/x/xio"
"github.com/m3db/m3/src/x/context"
xerrors "github.com/m3db/m3/src/x/errors"
"github.com/m3db/m3/src/x/ident"
"github.com/m3db/m3/src/x/instrument"
"github.com/m3db/m3/src/x/pool"
Expand All @@ -48,6 +49,7 @@ import (
const (
errBucketMapCacheNotInSync = "bucket map keys do not match sorted keys cache"
errBucketMapCacheNotInSyncFmt = errBucketMapCacheNotInSync + ", blockStart: %d"
errTimestampFormat = time.RFC822Z
)

var (
Expand Down Expand Up @@ -135,7 +137,7 @@ type databaseBuffer interface {

Bootstrap(bl block.DatabaseBlock)

Reset(opts Options)
Reset(id ident.ID, opts Options)
}

type bufferStats struct {
Expand Down Expand Up @@ -203,6 +205,7 @@ func (t *OptimizedTimes) ForEach(fn func(t xtime.UnixNano)) {
}

type dbBuffer struct {
id ident.ID
opts Options
nowFn clock.NowFn

Expand Down Expand Up @@ -236,7 +239,8 @@ func newDatabaseBuffer() databaseBuffer {
return b
}

func (b *dbBuffer) Reset(opts Options) {
func (b *dbBuffer) Reset(id ident.ID, opts Options) {
b.id = id
b.opts = opts
b.nowFn = opts.ClockOptions().NowFn()
ropts := opts.RetentionOptions()
Expand All @@ -250,20 +254,6 @@ func (b *dbBuffer) Reset(opts Options) {
b.futureRetentionPeriod = ropts.FutureRetentionPeriod()
}

// ResolveWriteType returns whether a write is a cold write or warm write.
func (b *dbBuffer) ResolveWriteType(
timestamp time.Time,
now time.Time,
) WriteType {
pastLimit := now.Add(-1 * b.bufferPast)
futureLimit := now.Add(b.bufferFuture)
if !pastLimit.Before(timestamp) || !futureLimit.After(timestamp) {
return ColdWrite
}

return WarmWrite
}

func (b *dbBuffer) Write(
ctx context.Context,
timestamp time.Time,
Expand All @@ -272,14 +262,42 @@ func (b *dbBuffer) Write(
annotation []byte,
wOpts WriteOptions,
) (bool, error) {
now := b.nowFn()
writeType := b.ResolveWriteType(timestamp, now)

if writeType == ColdWrite {
var (
now = b.nowFn()
pastLimit = now.Add(-1 * b.bufferPast)
futureLimit = now.Add(b.bufferFuture)
writeType WriteType
)
switch {
case !pastLimit.Before(timestamp):
writeType = ColdWrite
if !b.coldWritesEnabled {
return false, xerrors.NewInvalidParamsError(
fmt.Errorf("datapoint too far in past: "+
"id=%s, off_by=%s, timestamp=%s, past_limit=%s, "+
"timestamp_unix_nanos=%d, past_limit_unix_nanos=%d",
b.id.Bytes(), pastLimit.Sub(timestamp).String(),
timestamp.Format(errTimestampFormat),
pastLimit.Format(errTimestampFormat),
timestamp.UnixNano(), pastLimit.UnixNano()))
}
case !futureLimit.After(timestamp):
writeType = ColdWrite
if !b.coldWritesEnabled {
return false, m3dberrors.ErrColdWritesNotEnabled
return false, xerrors.NewInvalidParamsError(
fmt.Errorf("datapoint too far in future: "+
"id=%s, off_by=%s, timestamp=%s, future_limit=%s, "+
"timestamp_unix_nanos=%d, future_limit_unix_nanos=%d",
b.id.Bytes(), timestamp.Sub(futureLimit).String(),
timestamp.Format(errTimestampFormat),
futureLimit.Format(errTimestampFormat),
timestamp.UnixNano(), futureLimit.UnixNano()))
}
default:
writeType = WarmWrite
}

if writeType == ColdWrite {
if now.Add(-b.retentionPeriod).After(timestamp) {
return false, m3dberrors.ErrTooPast
}
Expand Down
8 changes: 4 additions & 4 deletions src/dbnode/storage/series/buffer_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 5e785e6

Please sign in to comment.