Skip to content

Commit

Permalink
[coordinator] Use tag options specified in config with M3Msg ingester (
Browse files Browse the repository at this point in the history
  • Loading branch information
robskillington authored Mar 15, 2020
1 parent 66f5bd7 commit 5289a5f
Show file tree
Hide file tree
Showing 4 changed files with 256 additions and 22 deletions.
6 changes: 5 additions & 1 deletion src/cmd/services/m3coordinator/ingest/m3msg/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
package ingestm3msg

import (
"github.com/m3db/m3/src/query/models"
"github.com/m3db/m3/src/query/storage"
"github.com/m3db/m3/src/x/instrument"
"github.com/m3db/m3/src/x/pool"
Expand All @@ -43,9 +44,10 @@ type Configuration struct {
// NewIngester creates an ingester with an appender.
func (cfg Configuration) NewIngester(
appender storage.Appender,
tagOptions models.TagOptions,
instrumentOptions instrument.Options,
) (*Ingester, error) {
opts, err := cfg.newOptions(appender, instrumentOptions)
opts, err := cfg.newOptions(appender, tagOptions, instrumentOptions)
if err != nil {
return nil, err
}
Expand All @@ -54,6 +56,7 @@ func (cfg Configuration) NewIngester(

func (cfg Configuration) newOptions(
appender storage.Appender,
tagOptions models.TagOptions,
instrumentOptions instrument.Options,
) (Options, error) {
scope := instrumentOptions.MetricsScope().Tagged(
Expand Down Expand Up @@ -90,6 +93,7 @@ func (cfg Configuration) newOptions(
Appender: appender,
Workers: workers,
PoolOptions: cfg.OpPool.NewObjectPoolOptions(instrumentOptions),
TagOptions: tagOptions,
TagDecoderPool: tagDecoderPool,
RetryOptions: cfg.Retry.NewOptions(scope),
Sampler: sampler,
Expand Down
5 changes: 3 additions & 2 deletions src/cmd/services/m3coordinator/ingest/m3msg/ingest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,10 @@ import (
"github.com/m3db/m3/src/query/models"
"github.com/m3db/m3/src/query/storage"
"github.com/m3db/m3/src/query/ts"
"github.com/m3db/m3/src/x/serialize"
"github.com/m3db/m3/src/x/ident"
"github.com/m3db/m3/src/x/instrument"
"github.com/m3db/m3/src/x/pool"
"github.com/m3db/m3/src/x/serialize"
xtime "github.com/m3db/m3/src/x/time"

"github.com/golang/mock/gomock"
Expand All @@ -54,7 +54,8 @@ func TestIngest(t *testing.T) {
},
}
appender := &mockAppender{}
ingester, err := cfg.NewIngester(appender, instrument.NewOptions())
ingester, err := cfg.NewIngester(appender, models.NewTagOptions(),
instrument.NewOptions())
require.NoError(t, err)

id := newTestID(t, "__name__", "foo", "app", "bar")
Expand Down
48 changes: 38 additions & 10 deletions src/query/server/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,14 @@ type RunOptions struct {
// on once it has opened.
ListenerCh chan<- net.Listener

// M3MsgListenerCh is a programmatic channel to receive the M3Msg server
// listener on once it has opened.
M3MsgListenerCh chan<- net.Listener

// DownsamplerReadyCh is a programmatic channel to receive the downsampler
// ready signal once it is open.
DownsamplerReadyCh chan<- struct{}

// CustomHandlers is a list of custom 3rd party handlers.
CustomHandlers []options.CustomHandler

Expand Down Expand Up @@ -228,8 +236,7 @@ func Run(runOpts RunOptions) {
instrumentOptions,
cfg.ReadWorkerPool,
cfg.WriteWorkerPool,
scope,
)
scope)
if err != nil {
logger.Fatal("could not create worker pools", zap.Error(err))
}
Expand Down Expand Up @@ -286,7 +293,8 @@ func Run(runOpts RunOptions) {
var cleanup cleanupFn
backendStorage, clusterClient, downsampler, cleanup, err = newM3DBStorage(
cfg, m3dbClusters, m3dbPoolWrapper,
runOpts, queryCtxOpts, tsdbOpts, instrumentOptions)
runOpts, queryCtxOpts, tsdbOpts,
runOpts.DownsamplerReadyCh, instrumentOptions)

if err != nil {
logger.Fatal("unable to setup m3db backend", zap.Error(err))
Expand Down Expand Up @@ -379,25 +387,33 @@ func Run(runOpts RunOptions) {
if cfg.Ingest != nil {
logger.Info("starting m3msg server",
zap.String("address", cfg.Ingest.M3Msg.Server.ListenAddress))
ingester, err := cfg.Ingest.Ingester.NewIngester(backendStorage, instrumentOptions)
ingester, err := cfg.Ingest.Ingester.NewIngester(backendStorage,
tagOptions, instrumentOptions)
if err != nil {
logger.Fatal("unable to create ingester", zap.Error(err))
}

server, err := cfg.Ingest.M3Msg.NewServer(
ingester.Ingest,
instrumentOptions.SetMetricsScope(scope.SubScope("ingest-m3msg")),
)

instrumentOptions.SetMetricsScope(scope.SubScope("ingest-m3msg")))
if err != nil {
logger.Fatal("unable to create m3msg server", zap.Error(err))
}

if err := server.ListenAndServe(); err != nil {
listener, err := net.Listen("tcp", cfg.Ingest.M3Msg.Server.ListenAddress)
if err != nil {
logger.Fatal("unable to open m3msg server", zap.Error(err))
}

if runOpts.M3MsgListenerCh != nil {
runOpts.M3MsgListenerCh <- listener
}

if err := server.Serve(listener); err != nil {
logger.Fatal("unable to listen on ingest server", zap.Error(err))
}

logger.Info("started m3msg server ")
logger.Info("started m3msg server", zap.Stringer("addr", listener.Addr()))
defer server.Close()
} else {
logger.Info("no m3msg server configured")
Expand Down Expand Up @@ -425,6 +441,7 @@ func newM3DBStorage(
runOpts RunOptions,
queryContextOptions models.QueryContextOptions,
tsdbOpts tsdb.Options,
downsamplerReadyCh chan<- struct{},
instrumentOptions instrument.Options,
) (storage.Storage, clusterclient.Client, downsample.Downsampler, cleanupFn, error) {
var (
Expand Down Expand Up @@ -486,8 +503,19 @@ func newM3DBStorage(
}

newDownsamplerFn := func() (downsample.Downsampler, error) {
return newDownsampler(cfg.Downsample, clusterClient,
downsampler, err := newDownsampler(cfg.Downsample, clusterClient,
fanoutStorage, autoMappingRules, tsdbOpts.TagOptions(), instrumentOptions)
if err != nil {
return nil, err
}

// Notify the downsampler ready channel that
// the downsampler has now been created and is ready.
if downsamplerReadyCh != nil {
downsamplerReadyCh <- struct{}{}
}

return downsampler, nil
}

if clusterClientWaitCh != nil {
Expand Down
Loading

0 comments on commit 5289a5f

Please sign in to comment.