Skip to content

Commit

Permalink
[dbnode] Remove implicit cloning of time ranges to reduce Allocs (#2178)
Browse files Browse the repository at this point in the history
* Remove implicit cloning of time ranges.

Co-authored-by: Rob Skillington <rob.skillington@gmail.com>
  • Loading branch information
notbdu and robskillington authored Mar 16, 2020
1 parent fb46ae7 commit cbb8109
Show file tree
Hide file tree
Showing 36 changed files with 594 additions and 472 deletions.
6 changes: 3 additions & 3 deletions src/dbnode/storage/bootstrap/bootstrapper/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,9 +240,9 @@ func (b baseBootstrapper) logSuccessAndDetermineCurrResultsUnfulfilledAndNextBoo
nextNamespace.DataRunOptions.ShardTimeRanges = dataUnfulfilled.Copy()

var (
indexCurrRequested = result.ShardTimeRanges{}
indexCurrFulfilled = result.ShardTimeRanges{}
indexUnfulfilled = result.ShardTimeRanges{}
indexCurrRequested = result.NewShardTimeRanges()
indexCurrFulfilled = result.NewShardTimeRanges()
indexUnfulfilled = result.NewShardTimeRanges()
)
if currNamespace.Metadata.Options().IndexOptions().Enabled() {
// Calculate bootstrap time ranges.
Expand Down
17 changes: 9 additions & 8 deletions src/dbnode/storage/bootstrap/bootstrapper/base_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,9 @@ func testTargetRanges() xtime.Ranges {
}

func testShardTimeRanges() result.ShardTimeRanges {
return map[uint32]xtime.Ranges{testShard: testTargetRanges()}
r := result.NewShardTimeRanges()
r.Set(testShard, testTargetRanges())
return r
}

func testResult(
Expand All @@ -78,9 +80,8 @@ func testResult(
shard uint32,
unfulfilledRange xtime.Ranges,
) bootstrap.NamespaceResults {
unfulfilled := result.ShardTimeRanges{
shard: unfulfilledRange,
}
unfulfilled := result.NewShardTimeRanges()
unfulfilled.Set(shard, unfulfilledRange)

opts := bootstrap.NamespaceResultsMapOptions{}
results := bootstrap.NewNamespaceResultsMap(opts)
Expand Down Expand Up @@ -130,7 +131,7 @@ func testBaseBootstrapperEmptyRange(t *testing.T, withIndex bool) {
src, _, base := testBaseBootstrapper(t, ctrl)
testNs := testNsMetadata(t, withIndex)

rngs := result.ShardTimeRanges{}
rngs := result.NewShardTimeRanges()
unfulfilled := xtime.NewRanges()
nsResults := testResult(testNs, withIndex, testShard, unfulfilled)
shardRangeMatcher := bootstrap.ShardTimeRangesMatcher{Ranges: rngs}
Expand Down Expand Up @@ -255,11 +256,11 @@ func testBasebootstrapperNext(

src.EXPECT().
AvailableData(testNs, targetRanges, testDefaultRunOpts).
Return(nil, nil)
Return(result.NewShardTimeRanges(), nil)
if withIndex {
src.EXPECT().
AvailableIndex(testNs, targetRanges, testDefaultRunOpts).
Return(nil, nil)
Return(result.NewShardTimeRanges(), nil)
}

tester := bootstrap.BuildNamespacesTester(t, testDefaultRunOpts, targetRanges,
Expand All @@ -280,7 +281,7 @@ func testBasebootstrapperNext(
expected := ex.DataResult.Unfulfilled()
expectedIdx := ex.IndexResult.Unfulfilled()
if !withIndex {
expectedIdx = result.ShardTimeRanges{}
expectedIdx = result.NewShardTimeRanges()
}

tester.TestUnfulfilledForNamespace(testNs, expected, expectedIdx)
Expand Down
22 changes: 12 additions & 10 deletions src/dbnode/storage/bootstrap/bootstrapper/commitlog/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ func (s *commitLogSource) Read(

// NB(r): Combine all shard time ranges across data and index
// so we can do in one go.
shardTimeRanges := result.ShardTimeRanges{}
shardTimeRanges := result.NewShardTimeRanges()
shardTimeRanges.AddRanges(ns.DataRunOptions.ShardTimeRanges)
if ns.Metadata.Options().IndexOptions().Enabled() {
shardTimeRanges.AddRanges(ns.IndexRunOptions.ShardTimeRanges)
Expand Down Expand Up @@ -240,7 +240,7 @@ func (s *commitLogSource) Read(

// Start by reading any available snapshot files.
blockSize := ns.Metadata.Options().RetentionOptions().BlockSize()
for shard, tr := range shardTimeRanges {
for shard, tr := range shardTimeRanges.Iter() {
err := s.bootstrapShardSnapshots(
ns.Metadata, accumulator, shard, tr, blockSize,
mostRecentCompleteSnapshotByBlockShard)
Expand Down Expand Up @@ -495,8 +495,8 @@ func (s *commitLogSource) Read(
// NB(r): This can occur when a topology change happens then we
// bootstrap from the commit log data that the node no longer owns.
shard := seriesEntry.series.Shard
_, bootstrapping := seriesEntry.namespace.dataAndIndexShardRanges[shard]
if !bootstrapping {
_, ok = seriesEntry.namespace.dataAndIndexShardRanges.Get(shard)
if !ok {
datapointsSkippedNotBootstrappingShard++
continue
}
Expand Down Expand Up @@ -576,7 +576,7 @@ func (s *commitLogSource) snapshotFilesByShard(
shardsTimeRanges result.ShardTimeRanges,
) (map[uint32]fs.FileSetFilesSlice, error) {
snapshotFilesByShard := map[uint32]fs.FileSetFilesSlice{}
for shard := range shardsTimeRanges {
for shard := range shardsTimeRanges.Iter() {
snapshotFiles, err := s.snapshotFilesFn(filePathPrefix, nsID, shard)
if err != nil {
return nil, err
Expand Down Expand Up @@ -604,7 +604,7 @@ func (s *commitLogSource) mostRecentCompleteSnapshotByBlockShard(
)

for currBlockStart := minBlock.Truncate(blockSize); currBlockStart.Before(maxBlock); currBlockStart = currBlockStart.Add(blockSize) {
for shard := range shardsTimeRanges {
for shard := range shardsTimeRanges.Iter() {
// Anonymous func for easier clean up using defer.
func() {
var (
Expand Down Expand Up @@ -992,10 +992,10 @@ func (s *commitLogSource) availability(
) (result.ShardTimeRanges, error) {
var (
topoState = runOpts.InitialTopologyState()
availableShardTimeRanges = result.ShardTimeRanges{}
availableShardTimeRanges = result.NewShardTimeRanges()
)

for shardIDUint := range shardsTimeRanges {
for shardIDUint := range shardsTimeRanges.Iter() {
shardID := topology.ShardID(shardIDUint)
hostShardStates, ok := topoState.ShardStates[shardID]
if !ok {
Expand Down Expand Up @@ -1036,11 +1036,13 @@ func (s *commitLogSource) availability(
// to distinguish between "unfulfilled" data and "corrupt" data, then
// modify this to only say the commit log bootstrapper can fullfil
// "unfulfilled" data, but not corrupt data.
availableShardTimeRanges[shardIDUint] = shardsTimeRanges[shardIDUint]
if tr, ok := shardsTimeRanges.Get(shardIDUint); ok {
availableShardTimeRanges.Set(shardIDUint, tr)
}
case shard.Unknown:
fallthrough
default:
return result.ShardTimeRanges{}, fmt.Errorf("unknown shard state: %v", originShardState)
return result.NewShardTimeRanges(), fmt.Errorf("unknown shard state: %v", originShardState)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,18 +62,18 @@ func TestAvailableEmptyRangeError(t *testing.T) {
var (
opts = testDefaultOpts
src = newCommitLogSource(opts, fs.Inspection{})
res, err = src.AvailableData(testNsMetadata(t), result.ShardTimeRanges{}, testDefaultRunOpts)
res, err = src.AvailableData(testNsMetadata(t), result.NewShardTimeRanges(), testDefaultRunOpts)
)
require.NoError(t, err)
require.True(t, result.ShardTimeRanges{}.Equal(res))
require.True(t, result.NewShardTimeRanges().Equal(res))
}

func TestReadEmpty(t *testing.T) {
opts := testDefaultOpts

src := newCommitLogSource(opts, fs.Inspection{})
md := testNsMetadata(t)
target := result.ShardTimeRanges{}
target := result.NewShardTimeRanges()
tester := bootstrap.BuildNamespacesTester(t, testDefaultRunOpts, target, md)
defer tester.Finish()

Expand All @@ -97,14 +97,10 @@ func TestReadErrorOnNewIteratorError(t *testing.T) {
return nil, nil, errors.New("an error")
}

ranges := xtime.Ranges{}
ranges = ranges.AddRange(xtime.Range{
Start: time.Now(),
End: time.Now().Add(time.Hour),
})
ranges := xtime.NewRanges(xtime.Range{Start: time.Now(), End: time.Now().Add(time.Hour)})

md := testNsMetadata(t)
target := result.ShardTimeRanges{0: ranges}
target := result.NewShardTimeRanges().Set(0, ranges)
tester := bootstrap.BuildNamespacesTester(t, testDefaultRunOpts, target, md)
defer tester.Finish()

Expand All @@ -131,11 +127,7 @@ func testReadOrderedValues(t *testing.T, opts Options, md namespace.Metadata, se
start := now.Truncate(blockSize).Add(-blockSize)
end := now.Truncate(blockSize)

ranges := xtime.Ranges{}
ranges = ranges.AddRange(xtime.Range{
Start: start,
End: end,
})
ranges := xtime.NewRanges(xtime.Range{Start: start, End: end})

foo := ts.Series{Namespace: nsCtx.ID, Shard: 0, ID: ident.StringID("foo")}
bar := ts.Series{Namespace: nsCtx.ID, Shard: 1, ID: ident.StringID("bar")}
Expand All @@ -159,7 +151,7 @@ func testReadOrderedValues(t *testing.T, opts Options, md namespace.Metadata, se
return newTestCommitLogIterator(values, nil), nil, nil
}

targetRanges := result.ShardTimeRanges{0: ranges, 1: ranges}
targetRanges := result.NewShardTimeRanges().Set(0, ranges).Set(1, ranges)
tester := bootstrap.BuildNamespacesTester(t, testDefaultRunOpts, targetRanges, md)
defer tester.Finish()

Expand Down Expand Up @@ -187,11 +179,7 @@ func testReadUnorderedValues(t *testing.T, opts Options, md namespace.Metadata,
start := now.Truncate(blockSize).Add(-blockSize)
end := now.Truncate(blockSize)

ranges := xtime.Ranges{}
ranges = ranges.AddRange(xtime.Range{
Start: start,
End: end,
})
ranges := xtime.NewRanges(xtime.Range{Start: start, End: end})

foo := ts.Series{Namespace: nsCtx.ID, Shard: 0, ID: ident.StringID("foo")}

Expand All @@ -212,7 +200,7 @@ func testReadUnorderedValues(t *testing.T, opts Options, md namespace.Metadata,
return newTestCommitLogIterator(values, nil), nil, nil
}

targetRanges := result.ShardTimeRanges{0: ranges, 1: ranges}
targetRanges := result.NewShardTimeRanges().Set(0, ranges).Set(1, ranges)
tester := bootstrap.BuildNamespacesTester(t, testDefaultRunOpts, targetRanges, md)
defer tester.Finish()

Expand Down Expand Up @@ -243,11 +231,7 @@ func TestReadHandlesDifferentSeriesWithIdenticalUniqueIndex(t *testing.T) {
start := now.Truncate(blockSize).Add(-blockSize)
end := now.Truncate(blockSize)

ranges := xtime.Ranges{}
ranges = ranges.AddRange(xtime.Range{
Start: start,
End: end,
})
ranges := xtime.NewRanges(xtime.Range{Start: start, End: end})

// All series need to be in the same shard to exercise the regression.
foo := ts.Series{
Expand All @@ -274,7 +258,7 @@ func TestReadHandlesDifferentSeriesWithIdenticalUniqueIndex(t *testing.T) {
return newTestCommitLogIterator(values, nil), nil, nil
}

targetRanges := result.ShardTimeRanges{0: ranges, 1: ranges}
targetRanges := result.NewShardTimeRanges().Set(0, ranges).Set(1, ranges)
tester := bootstrap.BuildNamespacesTester(t, testDefaultRunOpts, targetRanges, md)
defer tester.Finish()

Expand Down Expand Up @@ -306,7 +290,7 @@ func testItMergesSnapshotsAndCommitLogs(t *testing.T, opts Options,
now = time.Now()
start = now.Truncate(blockSize).Add(-blockSize)
end = now.Truncate(blockSize)
ranges = xtime.Ranges{}
ranges = xtime.NewRanges()

foo = ts.Series{Namespace: nsCtx.ID, Shard: 0, ID: ident.StringID("foo")}
commitLogValues = testValues{
Expand All @@ -319,7 +303,7 @@ func testItMergesSnapshotsAndCommitLogs(t *testing.T, opts Options,
commitLogValues = setAnn(commitLogValues)
}

ranges = ranges.AddRange(xtime.Range{
ranges.AddRange(xtime.Range{
Start: start,
End: end,
})
Expand Down Expand Up @@ -411,7 +395,7 @@ func testItMergesSnapshotsAndCommitLogs(t *testing.T, opts Options,
return mockReader, nil
}

targetRanges := result.ShardTimeRanges{0: ranges}
targetRanges := result.NewShardTimeRanges().Set(0, ranges)
tester := bootstrap.BuildNamespacesTesterWithReaderIteratorPool(
t,
testDefaultRunOpts,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,23 +156,25 @@ func TestBootstrapIndex(t *testing.T) {
return newTestCommitLogIterator(values, nil), nil, nil
}

ranges := xtime.Ranges{}
ranges = ranges.AddRange(xtime.Range{
Start: start,
End: start.Add(dataBlockSize),
})
ranges = ranges.AddRange(xtime.Range{
Start: start.Add(dataBlockSize),
End: start.Add(2 * dataBlockSize),
})
ranges = ranges.AddRange(xtime.Range{
Start: start.Add(2 * dataBlockSize),
End: start.Add(3 * dataBlockSize),
})
ranges := xtime.NewRanges(
xtime.Range{Start: start, End: start.Add(dataBlockSize)},
xtime.Range{Start: start.Add(dataBlockSize), End: start.Add(2 * dataBlockSize)},
xtime.Range{Start: start.Add(2 * dataBlockSize), End: start.Add(3 * dataBlockSize)})

// Don't include ranges for shard 4 as thats how we're testing the noShardBootstrapRange series.
targetRanges := result.ShardTimeRanges{
shardn(0): ranges, shardn(1): ranges, shardn(2): ranges, shardn(5): ranges}
targetRanges := result.NewShardTimeRanges().Set(
shardn(0),
ranges,
).Set(
shardn(1),
ranges,
).Set(
shardn(2),
ranges,
).Set(
shardn(5),
ranges,
)

tester := bootstrap.BuildNamespacesTester(t, testDefaultRunOpts, targetRanges, md1, md2, md3)
defer tester.Finish()
Expand Down Expand Up @@ -220,7 +222,7 @@ func TestBootstrapIndexEmptyShardTimeRanges(t *testing.T) {
return newTestCommitLogIterator(values, nil), nil, nil
}

target := result.ShardTimeRanges{}
target := result.NewShardTimeRanges()
tester := bootstrap.BuildNamespacesTester(t, testDefaultRunOpts, target, md)
defer tester.Finish()

Expand Down Expand Up @@ -382,23 +384,25 @@ func TestBootstrapIndexFailsForDecodedTags(t *testing.T) {
return newTestCommitLogIterator(values, nil), nil, nil
}

ranges := xtime.Ranges{}
ranges = ranges.AddRange(xtime.Range{
Start: start,
End: start.Add(dataBlockSize),
})
ranges = ranges.AddRange(xtime.Range{
Start: start.Add(dataBlockSize),
End: start.Add(2 * dataBlockSize),
})
ranges = ranges.AddRange(xtime.Range{
Start: start.Add(2 * dataBlockSize),
End: start.Add(3 * dataBlockSize),
})
ranges := xtime.NewRanges(
xtime.Range{Start: start, End: start.Add(dataBlockSize)},
xtime.Range{Start: start.Add(dataBlockSize), End: start.Add(2 * dataBlockSize)},
xtime.Range{Start: start.Add(2 * dataBlockSize), End: start.Add(3 * dataBlockSize)})

// Don't include ranges for shard 4 as thats how we're testing the noShardBootstrapRange series.
targetRanges := result.ShardTimeRanges{
shardn(0): ranges, shardn(1): ranges, shardn(2): ranges, shardn(5): ranges}
targetRanges := result.NewShardTimeRanges().Set(
shardn(0),
ranges,
).Set(
shardn(1),
ranges,
).Set(
shardn(2),
ranges,
).Set(
shardn(5),
ranges,
)

tester := bootstrap.BuildNamespacesTester(t, testDefaultRunOpts, targetRanges, md1)
defer tester.Finish()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -331,11 +331,7 @@ func TestCommitLogSourcePropCorrectlyBootstrapsFromCommitlog(t *testing.T) {

// Determine time range to bootstrap
end := input.currentTime.Add(blockSize)
ranges := xtime.Ranges{}
ranges = ranges.AddRange(xtime.Range{
Start: start,
End: end,
})
ranges := xtime.NewRanges(xtime.Range{Start: start, End: end})

// Determine which shards we need to bootstrap (based on the randomly
// generated data)
Expand All @@ -352,9 +348,9 @@ func TestCommitLogSourcePropCorrectlyBootstrapsFromCommitlog(t *testing.T) {
}

// Assign the previously-determined bootstrap range to each known shard
shardTimeRanges := result.ShardTimeRanges{}
shardTimeRanges := result.NewShardTimeRanges()
for shard := range allShardsMap {
shardTimeRanges[shard] = ranges
shardTimeRanges.Set(shard, ranges)
}

// Perform the bootstrap
Expand Down
Loading

0 comments on commit cbb8109

Please sign in to comment.