From bf81bda1654b2fa5de27264e4063a5442c1ec418 Mon Sep 17 00:00:00 2001 From: Rob Skillington Date: Mon, 31 Aug 2020 19:38:55 -0400 Subject: [PATCH] [dbnode] Expose stream batch client options to config (#2576) --- docs/overview/media.md | 2 +- .../services/m3dbnode/config/config_test.go | 2 ++ src/dbnode/client/config.go | 25 ++++++++++++++++--- 3 files changed, 24 insertions(+), 5 deletions(-) diff --git a/docs/overview/media.md b/docs/overview/media.md index bb8af0bcdd..b7d474b6b5 100644 --- a/docs/overview/media.md +++ b/docs/overview/media.md @@ -18,7 +18,7 @@ Recordings of all past meetups can be found on a [Vimeo M3 Community Meetings fo ## Recorded Talks -- [CNCF Webinar: Maximizing M3 – Pushing performance boundaries in a distributed metrics engine](https://www.cncf.io/webinars/cncf-member-webinar-maximizing-m3-pushing-performance-boundaries-in-a-distributed-metrics-engine-at-global-scale/) By Ryan Allen - Aug 6, 2020. +- [CNCF Webinar: Maximizing M3 – Pushing performance boundaries in a distributed metrics engine](https://www.cncf.io/webinars/maximizing-m3-pushing-performance-boundaries-in-a-distributed-metrics-engine-at-global-scale/) By Ryan Allen - Aug 6, 2020. - [OSCON 2019: Large-Scale Automated Storage on Kubernetes](https://youtu.be/N9A7xSE9n-c) By Matt Schallert - Jul 18, 2019. [Slides](https://schallert.io/OSCON%20Large-Scale%20Automated%20Storage%20on%20Kubernetes.pdf) diff --git a/src/cmd/services/m3dbnode/config/config_test.go b/src/cmd/services/m3dbnode/config/config_test.go index 7e8522bdc2..c1c2b0a235 100644 --- a/src/cmd/services/m3dbnode/config/config_test.go +++ b/src/cmd/services/m3dbnode/config/config_test.go @@ -408,6 +408,8 @@ func TestConfiguration(t *testing.T) { asyncWriteMaxConcurrency: null useV2BatchAPIs: null writeTimestampOffset: null + fetchSeriesBlocksBatchConcurrency: null + fetchSeriesBlocksBatchSize: null gcPercentage: 100 writeNewSeriesLimitPerSecond: 1048576 writeNewSeriesBackoffDuration: 2ms diff --git a/src/dbnode/client/config.go b/src/dbnode/client/config.go index 5e759970d4..cea4240e87 100644 --- a/src/dbnode/client/config.go +++ b/src/dbnode/client/config.go @@ -106,6 +106,14 @@ type Configuration struct { // WriteTimestampOffset offsets all writes by specified duration into the past. WriteTimestampOffset *time.Duration `yaml:"writeTimestampOffset"` + + // FetchSeriesBlocksBatchConcurrency sets the number of batches of blocks to retrieve + // in parallel from a remote peer. Defaults to NumCPU / 2. + FetchSeriesBlocksBatchConcurrency *int `yaml:"fetchSeriesBlocksBatchConcurrency"` + + // FetchSeriesBlocksBatchSize sets the number of blocks to retrieve in a single batch + // from the remote peer. Defaults to 4096. + FetchSeriesBlocksBatchSize *int `yaml:"fetchSeriesBlocksBatchSize"` } // ProtoConfiguration is the configuration for running with ProtoDataMode enabled. @@ -411,16 +419,25 @@ func (c Configuration) NewAdminClient( v = v.SetSchemaRegistry(schemaRegistry) } - // Apply programtic custom options last + // Cast to admin options to apply admin config options. opts := v.(AdminOptions) - for _, opt := range custom { - opts = opt(opts) - } if c.WriteTimestampOffset != nil { opts = opts.SetWriteTimestampOffset(*c.WriteTimestampOffset) } + if c.FetchSeriesBlocksBatchConcurrency != nil { + opts = opts.SetFetchSeriesBlocksBatchConcurrency(*c.FetchSeriesBlocksBatchConcurrency) + } + if c.FetchSeriesBlocksBatchSize != nil { + opts = opts.SetFetchSeriesBlocksBatchSize(*c.FetchSeriesBlocksBatchSize) + } + + // Apply programmatic custom options last. + for _, opt := range custom { + opts = opt(opts) + } + asyncClusterOpts := NewOptionsForAsyncClusters(opts, asyncTopoInits, asyncClientOverrides) return NewAdminClient(opts, asyncClusterOpts...) }