Skip to content

Commit

Permalink
[dbnode] Expose stream batch client options to config (#2576)
Browse files Browse the repository at this point in the history
  • Loading branch information
robskillington authored Aug 31, 2020
1 parent 685baf7 commit bf81bda
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 5 deletions.
2 changes: 1 addition & 1 deletion docs/overview/media.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ Recordings of all past meetups can be found on a [Vimeo M3 Community Meetings fo

## Recorded Talks

- [CNCF Webinar: Maximizing M3 – Pushing performance boundaries in a distributed metrics engine](https://www.cncf.io/webinars/cncf-member-webinar-maximizing-m3-pushing-performance-boundaries-in-a-distributed-metrics-engine-at-global-scale/) By Ryan Allen - Aug 6, 2020.
- [CNCF Webinar: Maximizing M3 – Pushing performance boundaries in a distributed metrics engine](https://www.cncf.io/webinars/maximizing-m3-pushing-performance-boundaries-in-a-distributed-metrics-engine-at-global-scale/) By Ryan Allen - Aug 6, 2020.

- [OSCON 2019: Large-Scale Automated Storage on Kubernetes](https://youtu.be/N9A7xSE9n-c) By Matt Schallert - Jul 18, 2019. [Slides](https://schallert.io/OSCON%20Large-Scale%20Automated%20Storage%20on%20Kubernetes.pdf)

Expand Down
2 changes: 2 additions & 0 deletions src/cmd/services/m3dbnode/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,8 @@ func TestConfiguration(t *testing.T) {
asyncWriteMaxConcurrency: null
useV2BatchAPIs: null
writeTimestampOffset: null
fetchSeriesBlocksBatchConcurrency: null
fetchSeriesBlocksBatchSize: null
gcPercentage: 100
writeNewSeriesLimitPerSecond: 1048576
writeNewSeriesBackoffDuration: 2ms
Expand Down
25 changes: 21 additions & 4 deletions src/dbnode/client/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,14 @@ type Configuration struct {

// WriteTimestampOffset offsets all writes by specified duration into the past.
WriteTimestampOffset *time.Duration `yaml:"writeTimestampOffset"`

// FetchSeriesBlocksBatchConcurrency sets the number of batches of blocks to retrieve
// in parallel from a remote peer. Defaults to NumCPU / 2.
FetchSeriesBlocksBatchConcurrency *int `yaml:"fetchSeriesBlocksBatchConcurrency"`

// FetchSeriesBlocksBatchSize sets the number of blocks to retrieve in a single batch
// from the remote peer. Defaults to 4096.
FetchSeriesBlocksBatchSize *int `yaml:"fetchSeriesBlocksBatchSize"`
}

// ProtoConfiguration is the configuration for running with ProtoDataMode enabled.
Expand Down Expand Up @@ -411,16 +419,25 @@ func (c Configuration) NewAdminClient(
v = v.SetSchemaRegistry(schemaRegistry)
}

// Apply programtic custom options last
// Cast to admin options to apply admin config options.
opts := v.(AdminOptions)
for _, opt := range custom {
opts = opt(opts)
}

if c.WriteTimestampOffset != nil {
opts = opts.SetWriteTimestampOffset(*c.WriteTimestampOffset)
}

if c.FetchSeriesBlocksBatchConcurrency != nil {
opts = opts.SetFetchSeriesBlocksBatchConcurrency(*c.FetchSeriesBlocksBatchConcurrency)
}
if c.FetchSeriesBlocksBatchSize != nil {
opts = opts.SetFetchSeriesBlocksBatchSize(*c.FetchSeriesBlocksBatchSize)
}

// Apply programmatic custom options last.
for _, opt := range custom {
opts = opt(opts)
}

asyncClusterOpts := NewOptionsForAsyncClusters(opts, asyncTopoInits, asyncClientOverrides)
return NewAdminClient(opts, asyncClusterOpts...)
}

0 comments on commit bf81bda

Please sign in to comment.