diff --git a/src/dbnode/storage/bootstrap/bootstrapper/base.go b/src/dbnode/storage/bootstrap/bootstrapper/base.go index f1df77cbff..0ae3c0cd58 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/base.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/base.go @@ -240,9 +240,9 @@ func (b baseBootstrapper) logSuccessAndDetermineCurrResultsUnfulfilledAndNextBoo nextNamespace.DataRunOptions.ShardTimeRanges = dataUnfulfilled.Copy() var ( - indexCurrRequested = result.ShardTimeRanges{} - indexCurrFulfilled = result.ShardTimeRanges{} - indexUnfulfilled = result.ShardTimeRanges{} + indexCurrRequested = result.NewShardTimeRanges() + indexCurrFulfilled = result.NewShardTimeRanges() + indexUnfulfilled = result.NewShardTimeRanges() ) if currNamespace.Metadata.Options().IndexOptions().Enabled() { // Calculate bootstrap time ranges. diff --git a/src/dbnode/storage/bootstrap/bootstrapper/base_test.go b/src/dbnode/storage/bootstrap/bootstrapper/base_test.go index 71eb94f8e2..fcd9fb131e 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/base_test.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/base_test.go @@ -69,7 +69,9 @@ func testTargetRanges() xtime.Ranges { } func testShardTimeRanges() result.ShardTimeRanges { - return map[uint32]xtime.Ranges{testShard: testTargetRanges()} + r := result.NewShardTimeRanges() + r.Set(testShard, testTargetRanges()) + return r } func testResult( @@ -78,9 +80,8 @@ func testResult( shard uint32, unfulfilledRange xtime.Ranges, ) bootstrap.NamespaceResults { - unfulfilled := result.ShardTimeRanges{ - shard: unfulfilledRange, - } + unfulfilled := result.NewShardTimeRanges() + unfulfilled.Set(shard, unfulfilledRange) opts := bootstrap.NamespaceResultsMapOptions{} results := bootstrap.NewNamespaceResultsMap(opts) @@ -130,7 +131,7 @@ func testBaseBootstrapperEmptyRange(t *testing.T, withIndex bool) { src, _, base := testBaseBootstrapper(t, ctrl) testNs := testNsMetadata(t, withIndex) - rngs := result.ShardTimeRanges{} + rngs := result.NewShardTimeRanges() unfulfilled := xtime.NewRanges() nsResults := testResult(testNs, withIndex, testShard, unfulfilled) shardRangeMatcher := bootstrap.ShardTimeRangesMatcher{Ranges: rngs} @@ -255,11 +256,11 @@ func testBasebootstrapperNext( src.EXPECT(). AvailableData(testNs, targetRanges, testDefaultRunOpts). - Return(nil, nil) + Return(result.NewShardTimeRanges(), nil) if withIndex { src.EXPECT(). AvailableIndex(testNs, targetRanges, testDefaultRunOpts). - Return(nil, nil) + Return(result.NewShardTimeRanges(), nil) } tester := bootstrap.BuildNamespacesTester(t, testDefaultRunOpts, targetRanges, @@ -280,7 +281,7 @@ func testBasebootstrapperNext( expected := ex.DataResult.Unfulfilled() expectedIdx := ex.IndexResult.Unfulfilled() if !withIndex { - expectedIdx = result.ShardTimeRanges{} + expectedIdx = result.NewShardTimeRanges() } tester.TestUnfulfilledForNamespace(testNs, expected, expectedIdx) diff --git a/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source.go b/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source.go index c2f8c946e5..edc1c1c01e 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source.go @@ -208,7 +208,7 @@ func (s *commitLogSource) Read( // NB(r): Combine all shard time ranges across data and index // so we can do in one go. - shardTimeRanges := result.ShardTimeRanges{} + shardTimeRanges := result.NewShardTimeRanges() shardTimeRanges.AddRanges(ns.DataRunOptions.ShardTimeRanges) if ns.Metadata.Options().IndexOptions().Enabled() { shardTimeRanges.AddRanges(ns.IndexRunOptions.ShardTimeRanges) @@ -240,7 +240,7 @@ func (s *commitLogSource) Read( // Start by reading any available snapshot files. blockSize := ns.Metadata.Options().RetentionOptions().BlockSize() - for shard, tr := range shardTimeRanges { + for shard, tr := range shardTimeRanges.Iter() { err := s.bootstrapShardSnapshots( ns.Metadata, accumulator, shard, tr, blockSize, mostRecentCompleteSnapshotByBlockShard) @@ -495,8 +495,8 @@ func (s *commitLogSource) Read( // NB(r): This can occur when a topology change happens then we // bootstrap from the commit log data that the node no longer owns. shard := seriesEntry.series.Shard - _, bootstrapping := seriesEntry.namespace.dataAndIndexShardRanges[shard] - if !bootstrapping { + _, ok = seriesEntry.namespace.dataAndIndexShardRanges.Get(shard) + if !ok { datapointsSkippedNotBootstrappingShard++ continue } @@ -576,7 +576,7 @@ func (s *commitLogSource) snapshotFilesByShard( shardsTimeRanges result.ShardTimeRanges, ) (map[uint32]fs.FileSetFilesSlice, error) { snapshotFilesByShard := map[uint32]fs.FileSetFilesSlice{} - for shard := range shardsTimeRanges { + for shard := range shardsTimeRanges.Iter() { snapshotFiles, err := s.snapshotFilesFn(filePathPrefix, nsID, shard) if err != nil { return nil, err @@ -604,7 +604,7 @@ func (s *commitLogSource) mostRecentCompleteSnapshotByBlockShard( ) for currBlockStart := minBlock.Truncate(blockSize); currBlockStart.Before(maxBlock); currBlockStart = currBlockStart.Add(blockSize) { - for shard := range shardsTimeRanges { + for shard := range shardsTimeRanges.Iter() { // Anonymous func for easier clean up using defer. func() { var ( @@ -992,10 +992,10 @@ func (s *commitLogSource) availability( ) (result.ShardTimeRanges, error) { var ( topoState = runOpts.InitialTopologyState() - availableShardTimeRanges = result.ShardTimeRanges{} + availableShardTimeRanges = result.NewShardTimeRanges() ) - for shardIDUint := range shardsTimeRanges { + for shardIDUint := range shardsTimeRanges.Iter() { shardID := topology.ShardID(shardIDUint) hostShardStates, ok := topoState.ShardStates[shardID] if !ok { @@ -1036,11 +1036,13 @@ func (s *commitLogSource) availability( // to distinguish between "unfulfilled" data and "corrupt" data, then // modify this to only say the commit log bootstrapper can fullfil // "unfulfilled" data, but not corrupt data. - availableShardTimeRanges[shardIDUint] = shardsTimeRanges[shardIDUint] + if tr, ok := shardsTimeRanges.Get(shardIDUint); ok { + availableShardTimeRanges.Set(shardIDUint, tr) + } case shard.Unknown: fallthrough default: - return result.ShardTimeRanges{}, fmt.Errorf("unknown shard state: %v", originShardState) + return result.NewShardTimeRanges(), fmt.Errorf("unknown shard state: %v", originShardState) } } diff --git a/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source_data_test.go b/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source_data_test.go index 94a73fcbd2..8b2d272674 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source_data_test.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source_data_test.go @@ -62,10 +62,10 @@ func TestAvailableEmptyRangeError(t *testing.T) { var ( opts = testDefaultOpts src = newCommitLogSource(opts, fs.Inspection{}) - res, err = src.AvailableData(testNsMetadata(t), result.ShardTimeRanges{}, testDefaultRunOpts) + res, err = src.AvailableData(testNsMetadata(t), result.NewShardTimeRanges(), testDefaultRunOpts) ) require.NoError(t, err) - require.True(t, result.ShardTimeRanges{}.Equal(res)) + require.True(t, result.NewShardTimeRanges().Equal(res)) } func TestReadEmpty(t *testing.T) { @@ -73,7 +73,7 @@ func TestReadEmpty(t *testing.T) { src := newCommitLogSource(opts, fs.Inspection{}) md := testNsMetadata(t) - target := result.ShardTimeRanges{} + target := result.NewShardTimeRanges() tester := bootstrap.BuildNamespacesTester(t, testDefaultRunOpts, target, md) defer tester.Finish() @@ -97,14 +97,10 @@ func TestReadErrorOnNewIteratorError(t *testing.T) { return nil, nil, errors.New("an error") } - ranges := xtime.Ranges{} - ranges = ranges.AddRange(xtime.Range{ - Start: time.Now(), - End: time.Now().Add(time.Hour), - }) + ranges := xtime.NewRanges(xtime.Range{Start: time.Now(), End: time.Now().Add(time.Hour)}) md := testNsMetadata(t) - target := result.ShardTimeRanges{0: ranges} + target := result.NewShardTimeRanges().Set(0, ranges) tester := bootstrap.BuildNamespacesTester(t, testDefaultRunOpts, target, md) defer tester.Finish() @@ -131,11 +127,7 @@ func testReadOrderedValues(t *testing.T, opts Options, md namespace.Metadata, se start := now.Truncate(blockSize).Add(-blockSize) end := now.Truncate(blockSize) - ranges := xtime.Ranges{} - ranges = ranges.AddRange(xtime.Range{ - Start: start, - End: end, - }) + ranges := xtime.NewRanges(xtime.Range{Start: start, End: end}) foo := ts.Series{Namespace: nsCtx.ID, Shard: 0, ID: ident.StringID("foo")} bar := ts.Series{Namespace: nsCtx.ID, Shard: 1, ID: ident.StringID("bar")} @@ -159,7 +151,7 @@ func testReadOrderedValues(t *testing.T, opts Options, md namespace.Metadata, se return newTestCommitLogIterator(values, nil), nil, nil } - targetRanges := result.ShardTimeRanges{0: ranges, 1: ranges} + targetRanges := result.NewShardTimeRanges().Set(0, ranges).Set(1, ranges) tester := bootstrap.BuildNamespacesTester(t, testDefaultRunOpts, targetRanges, md) defer tester.Finish() @@ -187,11 +179,7 @@ func testReadUnorderedValues(t *testing.T, opts Options, md namespace.Metadata, start := now.Truncate(blockSize).Add(-blockSize) end := now.Truncate(blockSize) - ranges := xtime.Ranges{} - ranges = ranges.AddRange(xtime.Range{ - Start: start, - End: end, - }) + ranges := xtime.NewRanges(xtime.Range{Start: start, End: end}) foo := ts.Series{Namespace: nsCtx.ID, Shard: 0, ID: ident.StringID("foo")} @@ -212,7 +200,7 @@ func testReadUnorderedValues(t *testing.T, opts Options, md namespace.Metadata, return newTestCommitLogIterator(values, nil), nil, nil } - targetRanges := result.ShardTimeRanges{0: ranges, 1: ranges} + targetRanges := result.NewShardTimeRanges().Set(0, ranges).Set(1, ranges) tester := bootstrap.BuildNamespacesTester(t, testDefaultRunOpts, targetRanges, md) defer tester.Finish() @@ -243,11 +231,7 @@ func TestReadHandlesDifferentSeriesWithIdenticalUniqueIndex(t *testing.T) { start := now.Truncate(blockSize).Add(-blockSize) end := now.Truncate(blockSize) - ranges := xtime.Ranges{} - ranges = ranges.AddRange(xtime.Range{ - Start: start, - End: end, - }) + ranges := xtime.NewRanges(xtime.Range{Start: start, End: end}) // All series need to be in the same shard to exercise the regression. foo := ts.Series{ @@ -274,7 +258,7 @@ func TestReadHandlesDifferentSeriesWithIdenticalUniqueIndex(t *testing.T) { return newTestCommitLogIterator(values, nil), nil, nil } - targetRanges := result.ShardTimeRanges{0: ranges, 1: ranges} + targetRanges := result.NewShardTimeRanges().Set(0, ranges).Set(1, ranges) tester := bootstrap.BuildNamespacesTester(t, testDefaultRunOpts, targetRanges, md) defer tester.Finish() @@ -306,7 +290,7 @@ func testItMergesSnapshotsAndCommitLogs(t *testing.T, opts Options, now = time.Now() start = now.Truncate(blockSize).Add(-blockSize) end = now.Truncate(blockSize) - ranges = xtime.Ranges{} + ranges = xtime.NewRanges() foo = ts.Series{Namespace: nsCtx.ID, Shard: 0, ID: ident.StringID("foo")} commitLogValues = testValues{ @@ -319,7 +303,7 @@ func testItMergesSnapshotsAndCommitLogs(t *testing.T, opts Options, commitLogValues = setAnn(commitLogValues) } - ranges = ranges.AddRange(xtime.Range{ + ranges.AddRange(xtime.Range{ Start: start, End: end, }) @@ -411,7 +395,7 @@ func testItMergesSnapshotsAndCommitLogs(t *testing.T, opts Options, return mockReader, nil } - targetRanges := result.ShardTimeRanges{0: ranges} + targetRanges := result.NewShardTimeRanges().Set(0, ranges) tester := bootstrap.BuildNamespacesTesterWithReaderIteratorPool( t, testDefaultRunOpts, diff --git a/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source_index_test.go b/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source_index_test.go index b006595faf..506901ba8d 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source_index_test.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source_index_test.go @@ -156,23 +156,25 @@ func TestBootstrapIndex(t *testing.T) { return newTestCommitLogIterator(values, nil), nil, nil } - ranges := xtime.Ranges{} - ranges = ranges.AddRange(xtime.Range{ - Start: start, - End: start.Add(dataBlockSize), - }) - ranges = ranges.AddRange(xtime.Range{ - Start: start.Add(dataBlockSize), - End: start.Add(2 * dataBlockSize), - }) - ranges = ranges.AddRange(xtime.Range{ - Start: start.Add(2 * dataBlockSize), - End: start.Add(3 * dataBlockSize), - }) + ranges := xtime.NewRanges( + xtime.Range{Start: start, End: start.Add(dataBlockSize)}, + xtime.Range{Start: start.Add(dataBlockSize), End: start.Add(2 * dataBlockSize)}, + xtime.Range{Start: start.Add(2 * dataBlockSize), End: start.Add(3 * dataBlockSize)}) // Don't include ranges for shard 4 as thats how we're testing the noShardBootstrapRange series. - targetRanges := result.ShardTimeRanges{ - shardn(0): ranges, shardn(1): ranges, shardn(2): ranges, shardn(5): ranges} + targetRanges := result.NewShardTimeRanges().Set( + shardn(0), + ranges, + ).Set( + shardn(1), + ranges, + ).Set( + shardn(2), + ranges, + ).Set( + shardn(5), + ranges, + ) tester := bootstrap.BuildNamespacesTester(t, testDefaultRunOpts, targetRanges, md1, md2, md3) defer tester.Finish() @@ -220,7 +222,7 @@ func TestBootstrapIndexEmptyShardTimeRanges(t *testing.T) { return newTestCommitLogIterator(values, nil), nil, nil } - target := result.ShardTimeRanges{} + target := result.NewShardTimeRanges() tester := bootstrap.BuildNamespacesTester(t, testDefaultRunOpts, target, md) defer tester.Finish() @@ -382,23 +384,25 @@ func TestBootstrapIndexFailsForDecodedTags(t *testing.T) { return newTestCommitLogIterator(values, nil), nil, nil } - ranges := xtime.Ranges{} - ranges = ranges.AddRange(xtime.Range{ - Start: start, - End: start.Add(dataBlockSize), - }) - ranges = ranges.AddRange(xtime.Range{ - Start: start.Add(dataBlockSize), - End: start.Add(2 * dataBlockSize), - }) - ranges = ranges.AddRange(xtime.Range{ - Start: start.Add(2 * dataBlockSize), - End: start.Add(3 * dataBlockSize), - }) + ranges := xtime.NewRanges( + xtime.Range{Start: start, End: start.Add(dataBlockSize)}, + xtime.Range{Start: start.Add(dataBlockSize), End: start.Add(2 * dataBlockSize)}, + xtime.Range{Start: start.Add(2 * dataBlockSize), End: start.Add(3 * dataBlockSize)}) // Don't include ranges for shard 4 as thats how we're testing the noShardBootstrapRange series. - targetRanges := result.ShardTimeRanges{ - shardn(0): ranges, shardn(1): ranges, shardn(2): ranges, shardn(5): ranges} + targetRanges := result.NewShardTimeRanges().Set( + shardn(0), + ranges, + ).Set( + shardn(1), + ranges, + ).Set( + shardn(2), + ranges, + ).Set( + shardn(5), + ranges, + ) tester := bootstrap.BuildNamespacesTester(t, testDefaultRunOpts, targetRanges, md1) defer tester.Finish() diff --git a/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source_prop_test.go b/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source_prop_test.go index 0d38a6e4b5..529e5bd7db 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source_prop_test.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source_prop_test.go @@ -331,11 +331,7 @@ func TestCommitLogSourcePropCorrectlyBootstrapsFromCommitlog(t *testing.T) { // Determine time range to bootstrap end := input.currentTime.Add(blockSize) - ranges := xtime.Ranges{} - ranges = ranges.AddRange(xtime.Range{ - Start: start, - End: end, - }) + ranges := xtime.NewRanges(xtime.Range{Start: start, End: end}) // Determine which shards we need to bootstrap (based on the randomly // generated data) @@ -352,9 +348,9 @@ func TestCommitLogSourcePropCorrectlyBootstrapsFromCommitlog(t *testing.T) { } // Assign the previously-determined bootstrap range to each known shard - shardTimeRanges := result.ShardTimeRanges{} + shardTimeRanges := result.NewShardTimeRanges() for shard := range allShardsMap { - shardTimeRanges[shard] = ranges + shardTimeRanges.Set(shard, ranges) } // Perform the bootstrap diff --git a/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source_test.go b/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source_test.go index b4ad4fd081..03c92301b9 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source_test.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source_test.go @@ -48,15 +48,15 @@ func TestAvailableData(t *testing.T) { blockSize = 2 * time.Hour numShards = uint32(4) blockStart = time.Now().Truncate(blockSize) - shardTimeRangesToBootstrap = result.ShardTimeRanges{} - bootstrapRanges = xtime.Ranges{}.AddRange(xtime.Range{ + shardTimeRangesToBootstrap = result.NewShardTimeRanges() + bootstrapRanges = xtime.NewRanges(xtime.Range{ Start: blockStart, End: blockStart.Add(blockSize), }) ) for i := 0; i < int(numShards); i++ { - shardTimeRangesToBootstrap[uint32(i)] = bootstrapRanges + shardTimeRangesToBootstrap.Set(uint32(i), bootstrapRanges) } testCases := []struct { @@ -72,7 +72,7 @@ func TestAvailableData(t *testing.T) { tu.SelfID: tu.ShardsRange(0, numShards, shard.Initializing), }), shardsTimeRangesToBootstrap: shardTimeRangesToBootstrap, - expectedAvailableShardsTimeRanges: result.ShardTimeRanges{}, + expectedAvailableShardsTimeRanges: result.NewShardTimeRanges(), }, { title: "Single node - Shard unknown", @@ -80,7 +80,7 @@ func TestAvailableData(t *testing.T) { tu.SelfID: tu.ShardsRange(0, numShards, shard.Unknown), }), shardsTimeRangesToBootstrap: shardTimeRangesToBootstrap, - expectedAvailableShardsTimeRanges: result.ShardTimeRanges{}, + expectedAvailableShardsTimeRanges: result.NewShardTimeRanges(), expectedErr: errors.New("unknown shard state: Unknown"), }, { @@ -115,7 +115,7 @@ func TestAvailableData(t *testing.T) { notSelfID: tu.ShardsRange(0, numShards, shard.Available), }), shardsTimeRangesToBootstrap: shardTimeRangesToBootstrap, - expectedAvailableShardsTimeRanges: result.ShardTimeRanges{}, + expectedAvailableShardsTimeRanges: result.NewShardTimeRanges(), }, } diff --git a/src/dbnode/storage/bootstrap/bootstrapper/fs/source.go b/src/dbnode/storage/bootstrap/bootstrapper/fs/source.go index d9d4447242..017f1ed78e 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/fs/source.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/fs/source.go @@ -210,9 +210,9 @@ func (s *fileSystemSource) availability( md namespace.Metadata, shardsTimeRanges result.ShardTimeRanges, ) (result.ShardTimeRanges, error) { - result := make(map[uint32]xtime.Ranges, len(shardsTimeRanges)) - for shard, ranges := range shardsTimeRanges { - result[shard] = s.shardAvailability(md.ID(), shard, ranges) + result := result.NewShardTimeRangesFromSize(shardsTimeRanges.Len()) + for shard, ranges := range shardsTimeRanges.Iter() { + result.Set(shard, s.shardAvailability(md.ID(), shard, ranges)) } return result, nil } @@ -223,13 +223,13 @@ func (s *fileSystemSource) shardAvailability( targetRangesForShard xtime.Ranges, ) xtime.Ranges { if targetRangesForShard.IsEmpty() { - return xtime.Ranges{} + return xtime.NewRanges() } readInfoFilesResults := fs.ReadInfoFiles(s.fsopts.FilePathPrefix(), namespace, shard, s.fsopts.InfoReaderBufferSize(), s.fsopts.DecodingOptions()) - var tr xtime.Ranges + tr := xtime.NewRanges() for i := 0; i < len(readInfoFilesResults); i++ { result := readInfoFilesResults[i] if err := result.Err.Error(); err != nil { @@ -247,7 +247,7 @@ func (s *fileSystemSource) shardAvailability( w := time.Duration(info.BlockSize) currRange := xtime.Range{Start: t, End: t.Add(w)} if targetRangesForShard.Overlaps(currRange) { - tr = tr.AddRange(currRange) + tr.AddRange(currRange) } } return tr @@ -334,7 +334,7 @@ func (s *fileSystemSource) loadShardReadersDataIntoShardResult( docsPool = s.opts.IndexOptions().DocumentArrayPool() batch = docsPool.Get() totalEntries int - totalFulfilledRanges = result.ShardTimeRanges{} + totalFulfilledRanges = result.NewShardTimeRanges() ) defer docsPool.Put(batch) @@ -415,9 +415,7 @@ func (s *fileSystemSource) loadShardReadersDataIntoShardResult( if err == nil && run == bootstrapIndexRunType { // Mark index block as fulfilled. - fulfilled := result.ShardTimeRanges{ - shard: xtime.Ranges{}.AddRange(timeRange), - } + fulfilled := result.NewShardTimeRanges().Set(shard, xtime.NewRanges(timeRange)) err = runResult.index.IndexResults().MarkFulfilled(start, fulfilled, ns.Options().IndexOptions()) if err != nil { @@ -428,9 +426,7 @@ func (s *fileSystemSource) loadShardReadersDataIntoShardResult( } if err == nil { - fulfilled := result.ShardTimeRanges{ - shard: xtime.Ranges{}.AddRange(timeRange), - } + fulfilled := result.NewShardTimeRanges().Set(shard, xtime.NewRanges(timeRange)) totalFulfilledRanges.AddRanges(fulfilled) remainingRanges.Subtract(fulfilled) } else { @@ -463,7 +459,7 @@ func (s *fileSystemSource) loadShardReadersDataIntoShardResult( min, max = requestedRanges.MinMax() iopts = s.opts.ResultOptions().InstrumentOptions() ) - for _, remainingRange := range remainingRanges { + for _, remainingRange := range remainingRanges.Iter() { if remainingRange.Overlaps(initialIndexRange) { overlapsWithInitalIndexRange = true } @@ -717,16 +713,18 @@ func (s *fileSystemSource) bootstrapDataRunResultFromAvailability( ) *runResult { runResult := newRunResult() unfulfilled := runResult.data.Unfulfilled() - for shard, ranges := range shardsTimeRanges { + for shard, ranges := range shardsTimeRanges.Iter() { if ranges.IsEmpty() { continue } availability := s.shardAvailability(md.ID(), shard, ranges) - remaining := ranges.RemoveRanges(availability) + remaining := ranges.Clone() + remaining.RemoveRanges(availability) if !remaining.IsEmpty() { - unfulfilled.AddRanges(result.ShardTimeRanges{ - shard: remaining, - }) + unfulfilled.AddRanges(result.NewShardTimeRanges().Set( + shard, + remaining, + )) } } runResult.data.SetUnfulfilled(unfulfilled) @@ -743,7 +741,7 @@ func (s *fileSystemSource) bootstrapFromIndexPersistedBlocks( shardsTimeRanges result.ShardTimeRanges, ) (bootstrapFromIndexPersistedBlocksResult, error) { res := bootstrapFromIndexPersistedBlocksResult{ - fulfilled: result.ShardTimeRanges{}, + fulfilled: result.NewShardTimeRanges(), } indexBlockSize := ns.Options().IndexOptions().BlockSize() @@ -767,13 +765,16 @@ func (s *fileSystemSource) bootstrapFromIndexPersistedBlocks( Start: indexBlockStart, End: indexBlockStart.Add(indexBlockSize), } - willFulfill := result.ShardTimeRanges{} + willFulfill := result.NewShardTimeRanges() for _, shard := range info.Shards { - tr, ok := shardsTimeRanges[shard] + tr, ok := shardsTimeRanges.Get(shard) if !ok { // No ranges match for this shard. continue } + if _, ok := willFulfill.Get(shard); !ok { + willFulfill.Set(shard, xtime.NewRanges()) + } iter := tr.Iter() for iter.Next() { @@ -782,7 +783,7 @@ func (s *fileSystemSource) bootstrapFromIndexPersistedBlocks( if !intersects { continue } - willFulfill[shard] = willFulfill[shard].AddRange(intersection) + willFulfill.GetOrAdd(shard).AddRange(intersection) } } diff --git a/src/dbnode/storage/bootstrap/bootstrapper/fs/source_data_test.go b/src/dbnode/storage/bootstrap/bootstrapper/fs/source_data_test.go index 2a68ed358a..7bff19b3f0 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/fs/source_data_test.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/fs/source_data_test.go @@ -172,7 +172,7 @@ func testTimeRanges() xtime.Ranges { } func testShardTimeRanges() result.ShardTimeRanges { - return map[uint32]xtime.Ranges{testShard: testTimeRanges()} + return result.NewShardTimeRanges().Set(testShard, testTimeRanges()) } func testBootstrappingIndexShardTimeRanges() result.ShardTimeRanges { @@ -180,12 +180,13 @@ func testBootstrappingIndexShardTimeRanges() result.ShardTimeRanges { // `testBlockSize` values should be fulfilled in the index block. This is // `testBlockSize` rather than `testIndexSize` since the files generated // by this test use 2 hour (which is `testBlockSize`) reader blocks. - return map[uint32]xtime.Ranges{ - testShard: xtime.Ranges{}.AddRange(xtime.Range{ + return result.NewShardTimeRanges().Set( + testShard, + xtime.NewRanges(xtime.Range{ Start: testStart.Add(testBlockSize), End: testStart.Add(11 * time.Hour), }), - } + ) } func writeGoodFiles(t *testing.T, dir string, namespace ident.ID, shard uint32) { @@ -284,10 +285,13 @@ func sortedTagsFromTagsMap(tags map[string]string) ident.Tags { func validateTimeRanges(t *testing.T, tr xtime.Ranges, expected xtime.Ranges) { // Make range eclipses expected - require.True(t, expected.RemoveRanges(tr).IsEmpty()) + expectedWithRemovedRanges := expected.Clone() + expectedWithRemovedRanges.RemoveRanges(tr) + require.True(t, expectedWithRemovedRanges.IsEmpty()) // Now make sure no ranges outside of expected - expectedWithAddedRanges := expected.AddRanges(tr) + expectedWithAddedRanges := expected.Clone() + expectedWithAddedRanges.AddRanges(tr) require.Equal(t, expected.Len(), expectedWithAddedRanges.Len()) iter := expected.Iter() @@ -302,7 +306,7 @@ func TestAvailableEmptyRangeError(t *testing.T) { require.NoError(t, err) res, err := src.AvailableData( testNsMetadata(t), - map[uint32]xtime.Ranges{0: xtime.Ranges{}}, + result.NewShardTimeRanges().Set(0, xtime.NewRanges()), testDefaultRunOpts, ) require.NoError(t, err) @@ -385,13 +389,16 @@ func TestAvailableTimeRangeFilter(t *testing.T) { ) require.NoError(t, err) require.NotNil(t, res) - require.Equal(t, 1, len(res)) - require.NotNil(t, res[testShard]) + require.Equal(t, 1, res.Len()) + _, ok := res.Get(testShard) + require.True(t, ok) - expected := xtime.Ranges{}. - AddRange(xtime.Range{Start: testStart, End: testStart.Add(2 * time.Hour)}). - AddRange(xtime.Range{Start: testStart.Add(10 * time.Hour), End: testStart.Add(12 * time.Hour)}) - validateTimeRanges(t, res[testShard], expected) + expected := xtime.NewRanges( + xtime.Range{Start: testStart, End: testStart.Add(2 * time.Hour)}, + xtime.Range{Start: testStart.Add(10 * time.Hour), End: testStart.Add(12 * time.Hour)}) + tr, ok := res.Get(testShard) + require.True(t, ok) + validateTimeRanges(t, tr, expected) } func TestAvailableTimeRangePartialError(t *testing.T) { @@ -412,13 +419,16 @@ func TestAvailableTimeRangePartialError(t *testing.T) { ) require.NoError(t, err) require.NotNil(t, res) - require.Equal(t, 1, len(res)) - require.NotNil(t, res[testShard]) + require.Equal(t, 1, res.Len()) + _, ok := res.Get(testShard) + require.True(t, ok) - expected := xtime.Ranges{}. - AddRange(xtime.Range{Start: testStart, End: testStart.Add(2 * time.Hour)}). - AddRange(xtime.Range{Start: testStart.Add(10 * time.Hour), End: testStart.Add(12 * time.Hour)}) - validateTimeRanges(t, res[testShard], expected) + expected := xtime.NewRanges( + xtime.Range{Start: testStart, End: testStart.Add(2 * time.Hour)}, + xtime.Range{Start: testStart.Add(10 * time.Hour), End: testStart.Add(12 * time.Hour)}) + tr, ok := res.Get(testShard) + require.True(t, ok) + validateTimeRanges(t, tr, expected) } // NB: too real :'( @@ -435,7 +445,7 @@ func TestReadEmptyRangeErr(t *testing.T) { src, err := newFileSystemSource(newTestOptions(t, "foo")) require.NoError(t, err) nsMD := testNsMetadata(t) - tester := bootstrap.BuildNamespacesTester(t, testDefaultRunOpts, nil, nsMD) + tester := bootstrap.BuildNamespacesTester(t, testDefaultRunOpts, result.NewShardTimeRanges(), nsMD) defer tester.Finish() unfulfilledAndEmpty(t, src, nsMD, tester) } @@ -443,7 +453,7 @@ func TestReadEmptyRangeErr(t *testing.T) { func TestReadPatternError(t *testing.T) { src, err := newFileSystemSource(newTestOptions(t, "[[")) require.NoError(t, err) - timeRanges := result.ShardTimeRanges{testShard: xtime.Ranges{}} + timeRanges := result.NewShardTimeRanges().Set(testShard, xtime.NewRanges()) nsMD := testNsMetadata(t) tester := bootstrap.BuildNamespacesTester(t, testDefaultRunOpts, timeRanges, nsMD) @@ -497,10 +507,13 @@ func TestReadNilTimeRanges(t *testing.T) { src, err := newFileSystemSource(newTestOptions(t, dir)) require.NoError(t, err) - timeRanges := result.ShardTimeRanges{ - testShard: testTimeRanges(), - 555: xtime.Ranges{}, - } + timeRanges := result.NewShardTimeRanges().Set( + testShard, + testTimeRanges(), + ).Set( + 555, + xtime.NewRanges(), + ) validateReadResults(t, src, dir, timeRanges) } diff --git a/src/dbnode/storage/bootstrap/bootstrapper/fs/source_index_bench_test.go b/src/dbnode/storage/bootstrap/bootstrapper/fs/source_index_bench_test.go index 5f9b763bcc..39ff66ec0d 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/fs/source_index_bench_test.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/fs/source_index_bench_test.go @@ -105,7 +105,7 @@ func BenchmarkBootstrapIndex(b *testing.B) { namespaceDataDirPath, shards) // Clear the shard time ranges and add new ones. - times.shardTimeRanges = make(result.ShardTimeRanges) + times.shardTimeRanges = result.NewShardTimeRanges() times.start = time.Unix(0, math.MaxInt64) times.end = time.Unix(0, 0) for _, shard := range shards { @@ -132,7 +132,7 @@ func BenchmarkBootstrapIndex(b *testing.B) { max = end } - ranges = ranges.AddRange(xtime.Range{Start: start, End: end}) + ranges.AddRange(xtime.Range{Start: start, End: end}) // Override the block size if different. namespaceOpts := testNamespaceMetadata.Options() @@ -157,7 +157,7 @@ func BenchmarkBootstrapIndex(b *testing.B) { continue // Nothing to bootstrap for shard. } - times.shardTimeRanges[shard] = ranges + times.shardTimeRanges.Set(shard, ranges) if min.Before(times.start) { times.start = min diff --git a/src/dbnode/storage/bootstrap/bootstrapper/fs/source_index_test.go b/src/dbnode/storage/bootstrap/bootstrapper/fs/source_index_test.go index a28f2d6674..42acd36447 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/fs/source_index_test.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/fs/source_index_test.go @@ -76,12 +76,13 @@ func newTestBootstrapIndexTimes( panic("unexpected") } - shardTimeRanges := map[uint32]xtime.Ranges{ - testShard: xtime.Ranges{}.AddRange(xtime.Range{ + shardTimeRanges := result.NewShardTimeRanges().Set( + testShard, + xtime.NewRanges(xtime.Range{ Start: start, End: end, }), - } + ) return testBootstrapIndexTimes{ start: start, @@ -566,12 +567,13 @@ func TestBootstrapIndexWithPersistForIndexBlockAtRetentionEdge(t *testing.T) { require.NoError(t, err) // NB(bodu): Simulate requesting bootstrapping of two whole index blocks instead of 3 data blocks (1.5 index blocks). - times.shardTimeRanges = map[uint32]xtime.Ranges{ - testShard: xtime.Ranges{}.AddRange(xtime.Range{ + times.shardTimeRanges = result.NewShardTimeRanges().Set( + testShard, + xtime.NewRanges(xtime.Range{ Start: firstIndexBlockStart, End: times.end, }), - } + ) tester := bootstrap.BuildNamespacesTester(t, runOpts, times.shardTimeRanges, ns) defer tester.Finish() diff --git a/src/dbnode/storage/bootstrap/bootstrapper/peers/source.go b/src/dbnode/storage/bootstrap/bootstrapper/peers/source.go index e78f323595..41273c4c7f 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/peers/source.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/peers/source.go @@ -260,7 +260,7 @@ func (s *peersSource) readData( persistenceMaxQueueSize = s.opts.PersistenceMaxQueueSize() persistenceQueue = make(chan persistenceFlush, persistenceMaxQueueSize) resultOpts = s.opts.ResultOptions() - count = len(shardsTimeRanges) + count = shardsTimeRanges.Len() concurrency = s.opts.DefaultShardConcurrency() blockSize = nsMetadata.Options().RetentionOptions().BlockSize() ) @@ -279,7 +279,7 @@ func (s *peersSource) readData( workers := xsync.NewWorkerPool(concurrency) workers.Init() - for shard, ranges := range shardsTimeRanges { + for shard, ranges := range shardsTimeRanges.Iter() { shard, ranges := shard, ranges wg.Add(1) workers.Go(func() { @@ -329,9 +329,10 @@ func (s *peersSource) startPersistenceQueueWorkerLoop( // Make unfulfilled. lock.Lock() unfulfilled := bootstrapResult.Unfulfilled().Copy() - unfulfilled.AddRanges(result.ShardTimeRanges{ - flush.shard: xtime.NewRanges(flush.timeRange), - }) + unfulfilled.AddRanges(result.NewShardTimeRanges().Set( + flush.shard, + xtime.NewRanges(flush.timeRange), + )) bootstrapResult.SetUnfulfilled(unfulfilled) lock.Unlock() } @@ -363,7 +364,7 @@ func (s *peersSource) fetchBootstrapBlocksFromPeers( unfulfill := func(r xtime.Range) { lock.Lock() unfulfilled := bootstrapResult.Unfulfilled() - unfulfilled.AddRanges(result.ShardTimeRanges{shard: xtime.NewRanges(r)}) + unfulfilled.AddRanges(result.NewShardTimeRanges().Set(shard, xtime.NewRanges(r))) lock.Unlock() } for it.Next() { @@ -651,7 +652,7 @@ func (s *peersSource) readIndex( } var ( - count = len(shardsTimeRanges) + count = shardsTimeRanges.Len() indexBlockSize = ns.Options().IndexOptions().BlockSize() runtimeOpts = s.opts.RuntimeOptionsManager().Get() fsOpts = s.opts.FilesystemOptions() @@ -766,17 +767,19 @@ func (s *peersSource) processReaders( if err == nil { // Mark index block as fulfilled. - fulfilled := result.ShardTimeRanges{ - shard: xtime.Ranges{}.AddRange(timeRange), - } + fulfilled := result.NewShardTimeRanges().Set( + shard, + xtime.NewRanges(timeRange), + ) err = r.IndexResults().MarkFulfilled(start, fulfilled, idxOpts) } if err == nil { - remainingRanges.Subtract(result.ShardTimeRanges{ - shard: xtime.Ranges{}.AddRange(timeRange), - }) + remainingRanges.Subtract(result.NewShardTimeRanges().Set( + shard, + xtime.NewRanges(timeRange), + )) } else { s.log.Error(err.Error(), zap.String("timeRange.start", fmt.Sprintf("%v", start))) @@ -913,7 +916,7 @@ func (s *peersSource) peerAvailability( initialTopologyState = runOpts.InitialTopologyState() ) - for shardIDUint := range shardsTimeRanges { + for shardIDUint := range shardsTimeRanges.Iter() { shardID := topology.ShardID(shardIDUint) shardPeers, ok := peerAvailabilityByShard[shardID] if !ok { @@ -958,9 +961,9 @@ func (s *peersSource) peerAvailability( runtimeOpts = s.opts.RuntimeOptionsManager().Get() bootstrapConsistencyLevel = runtimeOpts.ClientBootstrapConsistencyLevel() majorityReplicas = initialTopologyState.MajorityReplicas - availableShardTimeRanges = result.ShardTimeRanges{} + availableShardTimeRanges = result.NewShardTimeRanges() ) - for shardIDUint := range shardsTimeRanges { + for shardIDUint := range shardsTimeRanges.Iter() { var ( shardID = topology.ShardID(shardIDUint) shardPeers = peerAvailabilityByShard[shardID] @@ -992,7 +995,9 @@ func (s *peersSource) peerAvailability( // all the data. This assumption is safe, as the shard/block ranges // will simply be marked unfulfilled if the peers are not able to // satisfy the requests. - availableShardTimeRanges[shardIDUint] = shardsTimeRanges[shardIDUint] + if tr, ok := shardsTimeRanges.Get(shardIDUint); ok { + availableShardTimeRanges.Set(shardIDUint, tr) + } } return availableShardTimeRanges, nil @@ -1007,7 +1012,7 @@ func (s *peersSource) markIndexResultErrorAsUnfulfilled( ) { // NB(r): We explicitly do not remove entries from the index results // as they are additive and get merged together with results from other - // bootstrappers by just appending the result (unlike data bootstrap + // bootstrappers by just appending the result (ounlike data bootstrap // results that when merged replace the block with the current block). // It would also be difficult to remove only series that were added to the // index block as results from a specific data block can be subsets of the @@ -1016,9 +1021,10 @@ func (s *peersSource) markIndexResultErrorAsUnfulfilled( resultLock.Lock() defer resultLock.Unlock() - unfulfilled := result.ShardTimeRanges{ - shard: xtime.NewRanges(timeRange), - } + unfulfilled := result.NewShardTimeRanges().Set( + shard, + xtime.NewRanges(timeRange), + ) r.Add(result.IndexBlock{}, unfulfilled) } diff --git a/src/dbnode/storage/bootstrap/bootstrapper/peers/source_data_test.go b/src/dbnode/storage/bootstrap/bootstrapper/peers/source_data_test.go index 7083de3d2a..a86744543a 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/peers/source_data_test.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/peers/source_data_test.go @@ -142,7 +142,7 @@ func TestPeersSourceEmptyShardTimeRanges(t *testing.T) { var ( nsMetadata = testNamespaceMetadata(t) - target = result.ShardTimeRanges{} + target = result.NewShardTimeRanges() runOpts = testDefaultRunOpts.SetInitialTopologyState(&topology.StateSnapshot{}) ) available, err := src.AvailableData(nsMetadata, target, runOpts) @@ -176,10 +176,13 @@ func TestPeersSourceReturnsErrorForAdminSession(t *testing.T) { start := time.Now().Add(-ropts.RetentionPeriod()).Truncate(ropts.BlockSize()) end := start.Add(ropts.BlockSize()) - target := result.ShardTimeRanges{ - 0: xtime.NewRanges(xtime.Range{Start: start, End: end}), - 1: xtime.NewRanges(xtime.Range{Start: start, End: end}), - } + target := result.NewShardTimeRanges().Set( + 0, + xtime.NewRanges(xtime.Range{Start: start, End: end}), + ).Set( + 1, + xtime.NewRanges(xtime.Range{Start: start, End: end}), + ) tester := bootstrap.BuildNamespacesTester(t, testDefaultRunOpts, target, nsMetadata) defer tester.Finish() @@ -228,9 +231,10 @@ func TestPeersSourceReturnsUnfulfilled(t *testing.T) { src, err := newPeersSource(opts) require.NoError(t, err) - target := result.ShardTimeRanges{ - 0: xtime.NewRanges(xtime.Range{Start: start, End: end}), - } + target := result.NewShardTimeRanges().Set( + 0, + xtime.NewRanges(xtime.Range{Start: start, End: end}), + ) tester := bootstrap.BuildNamespacesTester(t, testDefaultRunOpts, target, nsMetadata) defer tester.Finish() @@ -418,10 +422,13 @@ func TestPeersSourceRunWithPersist(t *testing.T) { src, err := newPeersSource(opts) require.NoError(t, err) - target := result.ShardTimeRanges{ - 0: xtime.NewRanges(xtime.Range{Start: start, End: end}), - 1: xtime.NewRanges(xtime.Range{Start: start, End: end}), - } + target := result.NewShardTimeRanges().Set( + 0, + xtime.NewRanges(xtime.Range{Start: start, End: end}), + ).Set( + 1, + xtime.NewRanges(xtime.Range{Start: start, End: end}), + ) tester := bootstrap.BuildNamespacesTester(t, testRunOptsWithPersist, target, testNsMd) defer tester.Finish() @@ -733,31 +740,45 @@ func TestPeersSourceMarksUnfulfilledOnPersistenceErrors(t *testing.T) { src, err := newPeersSource(opts) require.NoError(t, err) - target := result.ShardTimeRanges{ - 0: xtime.Ranges{}. - AddRange(xtime.Range{Start: start, End: midway}). - AddRange(xtime.Range{Start: midway, End: end}), - 1: xtime.Ranges{}. - AddRange(xtime.Range{Start: start, End: midway}). - AddRange(xtime.Range{Start: midway, End: end}), - 2: xtime.Ranges{}. - AddRange(xtime.Range{Start: start, End: midway}). - AddRange(xtime.Range{Start: midway, End: end}), - 3: xtime.Ranges{}. - AddRange(xtime.Range{Start: start, End: midway}). - AddRange(xtime.Range{Start: midway, End: end}), - } + target := result.NewShardTimeRanges().Set( + 0, + xtime.NewRanges( + xtime.Range{Start: start, End: midway}, + xtime.Range{Start: midway, End: end}), + ).Set( + 1, + xtime.NewRanges( + xtime.Range{Start: start, End: midway}, + xtime.Range{Start: midway, End: end}), + ).Set( + 2, + xtime.NewRanges( + xtime.Range{Start: start, End: midway}, + xtime.Range{Start: midway, End: end}), + ).Set( + 3, + xtime.NewRanges( + xtime.Range{Start: start, End: midway}, + xtime.Range{Start: midway, End: end}), + ) tester := bootstrap.BuildNamespacesTester(t, testRunOptsWithPersist, target, testNsMd) defer tester.Finish() tester.TestReadWith(src) - expectedRanges := result.ShardTimeRanges{ - 0: xtime.Ranges{}.AddRange(xtime.Range{Start: start, End: midway}), - 1: xtime.Ranges{}.AddRange(xtime.Range{Start: start, End: midway}), - 2: xtime.Ranges{}.AddRange(xtime.Range{Start: start, End: midway}), - 3: xtime.Ranges{}.AddRange(xtime.Range{Start: start, End: midway}), - } + expectedRanges := result.NewShardTimeRanges().Set( + 0, + xtime.NewRanges(xtime.Range{Start: start, End: midway}), + ).Set( + 1, + xtime.NewRanges(xtime.Range{Start: start, End: midway}), + ).Set( + 2, + xtime.NewRanges(xtime.Range{Start: start, End: midway}), + ).Set( + 3, + xtime.NewRanges(xtime.Range{Start: start, End: midway}), + ) // NB(bodu): There is no time series data written to disk so all ranges fail to be fulfilled. expectedIndexRanges := target diff --git a/src/dbnode/storage/bootstrap/bootstrapper/peers/source_index_test.go b/src/dbnode/storage/bootstrap/bootstrapper/peers/source_index_test.go index 5fe4a1023c..59a7a84087 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/peers/source_index_test.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/peers/source_index_test.go @@ -233,12 +233,13 @@ func TestBootstrapIndex(t *testing.T) { end := start.Add(ropts.RetentionPeriod()) - shardTimeRanges := map[uint32]xtime.Ranges{ - 0: xtime.NewRanges(xtime.Range{ + shardTimeRanges := result.NewShardTimeRanges().Set( + 0, + xtime.NewRanges(xtime.Range{ Start: start, End: end, }), - } + ) mockAdminSession := client.NewMockAdminSession(ctrl) mockAdminSession.EXPECT(). @@ -354,11 +355,11 @@ func TestBootstrapIndex(t *testing.T) { blk1, ok := indexResults[xtime.ToUnixNano(t1)] require.True(t, ok) - assertShardRangesEqual(t, result.NewShardTimeRanges(t1, t2, 0), blk1.Fulfilled()) + assertShardRangesEqual(t, result.NewShardTimeRangesFromRange(t1, t2, 0), blk1.Fulfilled()) blk2, ok := indexResults[xtime.ToUnixNano(t2)] require.True(t, ok) - assertShardRangesEqual(t, result.NewShardTimeRanges(t2, t3, 0), blk2.Fulfilled()) + assertShardRangesEqual(t, result.NewShardTimeRangesFromRange(t2, t3, 0), blk2.Fulfilled()) for _, blk := range indexResults { if blk.BlockStart().Equal(t1) || blk.BlockStart().Equal(t2) { @@ -368,7 +369,7 @@ func TestBootstrapIndex(t *testing.T) { // any errors in the response. start := blk.BlockStart() end := start.Add(indexBlockSize) - assertShardRangesEqual(t, result.NewShardTimeRanges(start, end, 0), blk.Fulfilled()) + assertShardRangesEqual(t, result.NewShardTimeRangesFromRange(start, end, 0), blk.Fulfilled()) } tester.EnsureNoWrites() diff --git a/src/dbnode/storage/bootstrap/bootstrapper/peers/source_test.go b/src/dbnode/storage/bootstrap/bootstrapper/peers/source_test.go index eba8f23150..a9f1e14b9c 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/peers/source_test.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/peers/source_test.go @@ -52,19 +52,19 @@ func TestPeersSourceAvailableDataAndIndex(t *testing.T) { nsMetadata = testNamespaceMetadata(t) numShards = uint32(4) blockStart = time.Now().Truncate(blockSize) - shardTimeRangesToBootstrap = result.ShardTimeRanges{} - bootstrapRanges = xtime.Ranges{}.AddRange(xtime.Range{ + shardTimeRangesToBootstrap = result.NewShardTimeRanges() + bootstrapRanges = xtime.NewRanges(xtime.Range{ Start: blockStart, End: blockStart.Add(blockSize), }) ) for i := 0; i < int(numShards); i++ { - shardTimeRangesToBootstrap[uint32(i)] = bootstrapRanges + shardTimeRangesToBootstrap.Set(uint32(i), bootstrapRanges) } shardTimeRangesToBootstrapOneExtra := shardTimeRangesToBootstrap.Copy() - shardTimeRangesToBootstrapOneExtra[100] = bootstrapRanges + shardTimeRangesToBootstrapOneExtra.Set(100, bootstrapRanges) testCases := []struct { title string @@ -81,7 +81,7 @@ func TestPeersSourceAvailableDataAndIndex(t *testing.T) { }), bootstrapReadConsistency: topology.ReadConsistencyLevelMajority, shardsTimeRangesToBootstrap: shardTimeRangesToBootstrap, - expectedAvailableShardsTimeRanges: result.ShardTimeRanges{}, + expectedAvailableShardsTimeRanges: result.NewShardTimeRanges(), }, { title: "Returns empty if all other peers initializing/unknown", @@ -92,7 +92,7 @@ func TestPeersSourceAvailableDataAndIndex(t *testing.T) { }), bootstrapReadConsistency: topology.ReadConsistencyLevelMajority, shardsTimeRangesToBootstrap: shardTimeRangesToBootstrap, - expectedAvailableShardsTimeRanges: result.ShardTimeRanges{}, + expectedAvailableShardsTimeRanges: result.NewShardTimeRanges(), expectedErr: errors.New("unknown shard state: Unknown"), }, { @@ -126,7 +126,7 @@ func TestPeersSourceAvailableDataAndIndex(t *testing.T) { }), bootstrapReadConsistency: topology.ReadConsistencyLevelAll, shardsTimeRangesToBootstrap: shardTimeRangesToBootstrap, - expectedAvailableShardsTimeRanges: result.ShardTimeRanges{}, + expectedAvailableShardsTimeRanges: result.NewShardTimeRanges(), }, } @@ -188,10 +188,13 @@ func TestPeersSourceReturnsErrorIfUnknownPersistenceFileSetType(t *testing.T) { src, err := newPeersSource(opts) require.NoError(t, err) - target := result.ShardTimeRanges{ - 0: xtime.NewRanges(xtime.Range{Start: start, End: end}), - 1: xtime.NewRanges(xtime.Range{Start: start, End: end}), - } + target := result.NewShardTimeRanges().Set( + 0, + xtime.NewRanges(xtime.Range{Start: start, End: end}), + ).Set( + 1, + xtime.NewRanges(xtime.Range{Start: start, End: end}), + ) runOpts := testRunOptsWithPersist.SetPersistConfig(bootstrap.PersistConfig{Enabled: true, FileSetType: 999}) tester := bootstrap.BuildNamespacesTester(t, runOpts, target, testNsMd) diff --git a/src/dbnode/storage/bootstrap/bootstrapper/persist.go b/src/dbnode/storage/bootstrap/bootstrapper/persist.go index 1df2856f00..241dbff885 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/persist.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/persist.go @@ -91,13 +91,13 @@ func PersistBootstrapIndexSegment( } shards := make(map[uint32]struct{}) - expectedRanges := make(result.ShardTimeRanges, len(requestedRanges)) - for shard := range requestedRanges { + expectedRanges := result.NewShardTimeRangesFromSize(requestedRanges.Len()) + for shard := range requestedRanges.Iter() { shards[shard] = struct{}{} - expectedRanges[shard] = xtime.Ranges{}.AddRange(xtime.Range{ + expectedRanges.Set(shard, xtime.NewRanges(xtime.Range{ Start: expectedRangeStart, End: expectedRangeEnd, - }) + })) } indexBlock, ok := indexResults[xtime.ToUnixNano(blockStart)] @@ -251,12 +251,12 @@ func BuildBootstrapIndexSegment( expectedRangeStart = earliestRetentionTime } - expectedRanges := make(result.ShardTimeRanges, len(requestedRanges)) - for shard := range requestedRanges { - expectedRanges[shard] = xtime.Ranges{}.AddRange(xtime.Range{ + expectedRanges := result.NewShardTimeRangesFromSize(requestedRanges.Len()) + for shard := range requestedRanges.Iter() { + expectedRanges.Set(shard, xtime.NewRanges(xtime.Range{ Start: expectedRangeStart, End: expectedRangeEnd, - }) + })) } indexBlock, ok := indexResults[xtime.ToUnixNano(blockStart)] diff --git a/src/dbnode/storage/bootstrap/bootstrapper/ranges.go b/src/dbnode/storage/bootstrap/bootstrapper/ranges.go index 1c5006c64a..6c394b9e63 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/ranges.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/ranges.go @@ -48,8 +48,8 @@ func NewShardTimeRangesTimeWindowGroups( End: minTime(t.Add(windowSize), max), } - group := make(result.ShardTimeRanges) - for shard, tr := range shardTimeRanges { + group := result.NewShardTimeRanges() + for shard, tr := range shardTimeRanges.Iter() { iter := tr.Iter() for iter.Next() { evaluateRange := iter.Value() @@ -58,7 +58,7 @@ func NewShardTimeRangesTimeWindowGroups( continue } // Add to this range. - group[shard] = group[shard].AddRange(intersection) + group.GetOrAdd(shard).AddRange(intersection) } } diff --git a/src/dbnode/storage/bootstrap/bootstrapper/readers.go b/src/dbnode/storage/bootstrap/bootstrapper/readers.go index d49e3a9a09..75ecc52b38 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/readers.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/readers.go @@ -94,8 +94,8 @@ func enqueueReadersGroupedByBlockSize( // Now enqueue across all shards by block size. for _, group := range groupedByBlockSize { - readers := make(map[ShardID]ShardReaders, len(group.Ranges)) - for shard, tr := range group.Ranges { + readers := make(map[ShardID]ShardReaders, group.Ranges.Len()) + for shard, tr := range group.Ranges.Iter() { shardReaders := newShardReaders(ns, fsOpts, readerPool, shard, tr, logger) readers[ShardID(shard)] = shardReaders } diff --git a/src/dbnode/storage/bootstrap/bootstrapper/uninitialized/source.go b/src/dbnode/storage/bootstrap/bootstrapper/uninitialized/source.go index c8dfa698fb..ce0c83b930 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/uninitialized/source.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/uninitialized/source.go @@ -73,10 +73,10 @@ func (s *uninitializedTopologySource) availability( ) (result.ShardTimeRanges, error) { var ( topoState = runOpts.InitialTopologyState() - availableShardTimeRanges = result.ShardTimeRanges{} + availableShardTimeRanges = result.NewShardTimeRanges() ) - for shardIDUint := range shardsTimeRanges { + for shardIDUint := range shardsTimeRanges.Iter() { shardID := topology.ShardID(shardIDUint) hostShardStates, ok := topoState.ShardStates[shardID] if !ok { @@ -126,7 +126,9 @@ func (s *uninitializedTopologySource) availability( // factor to actually increase correctly. shardHasNeverBeenCompletelyInitialized := numInitializing-numLeaving > 0 if shardHasNeverBeenCompletelyInitialized { - availableShardTimeRanges[shardIDUint] = shardsTimeRanges[shardIDUint] + if tr, ok := shardsTimeRanges.Get(shardIDUint); ok { + availableShardTimeRanges.Set(shardIDUint, tr) + } } } diff --git a/src/dbnode/storage/bootstrap/bootstrapper/uninitialized/source_test.go b/src/dbnode/storage/bootstrap/bootstrapper/uninitialized/source_test.go index ec3a8a3553..85125b6a80 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/uninitialized/source_test.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/uninitialized/source_test.go @@ -51,8 +51,8 @@ func TestUnitializedTopologySourceAvailableDataAndAvailableIndex(t *testing.T) { blockSize = 2 * time.Hour numShards = uint32(4) blockStart = time.Now().Truncate(blockSize) - shardTimeRangesToBootstrap = result.ShardTimeRanges{} - bootstrapRanges = xtime.Ranges{}.AddRange(xtime.Range{ + shardTimeRangesToBootstrap = result.NewShardTimeRanges() + bootstrapRanges = xtime.NewRanges(xtime.Range{ Start: blockStart, End: blockStart.Add(blockSize), }) @@ -63,7 +63,7 @@ func TestUnitializedTopologySourceAvailableDataAndAvailableIndex(t *testing.T) { require.NoError(t, err) for i := 0; i < int(numShards); i++ { - shardTimeRangesToBootstrap[uint32(i)] = bootstrapRanges + shardTimeRangesToBootstrap.Set(uint32(i), bootstrapRanges) } testCases := []struct { @@ -101,7 +101,7 @@ func TestUnitializedTopologySourceAvailableDataAndAvailableIndex(t *testing.T) { tu.SelfID: tu.ShardsRange(0, numShards, shard.Leaving), }), shardsTimeRangesToBootstrap: shardTimeRangesToBootstrap, - expectedAvailableShardsTimeRanges: result.ShardTimeRanges{}, + expectedAvailableShardsTimeRanges: result.NewShardTimeRanges(), }, // Snould return that it can't bootstrap anything because it's not // a new namespace. @@ -111,7 +111,7 @@ func TestUnitializedTopologySourceAvailableDataAndAvailableIndex(t *testing.T) { tu.SelfID: tu.ShardsRange(0, numShards, shard.Available), }), shardsTimeRangesToBootstrap: shardTimeRangesToBootstrap, - expectedAvailableShardsTimeRanges: result.ShardTimeRanges{}, + expectedAvailableShardsTimeRanges: result.NewShardTimeRanges(), }, // Snould return that it can bootstrap everything because // it's a new namespace. @@ -148,7 +148,7 @@ func TestUnitializedTopologySourceAvailableDataAndAvailableIndex(t *testing.T) { notSelfID2: tu.ShardsRange(0, numShards, shard.Available), }), shardsTimeRangesToBootstrap: shardTimeRangesToBootstrap, - expectedAvailableShardsTimeRanges: result.ShardTimeRanges{}, + expectedAvailableShardsTimeRanges: result.NewShardTimeRanges(), }, // Snould return that it can't bootstrap anything because it's not // a new namespace, we're just doing a node replace. @@ -161,7 +161,7 @@ func TestUnitializedTopologySourceAvailableDataAndAvailableIndex(t *testing.T) { notSelfID3: tu.ShardsRange(0, numShards, shard.Initializing), }), shardsTimeRangesToBootstrap: shardTimeRangesToBootstrap, - expectedAvailableShardsTimeRanges: result.ShardTimeRanges{}, + expectedAvailableShardsTimeRanges: result.NewShardTimeRanges(), }, // Snould return that it can't bootstrap anything because we don't // know how to interpret the unknown host. diff --git a/src/dbnode/storage/bootstrap/process.go b/src/dbnode/storage/bootstrap/process.go index 8957482cd2..688d82c2b0 100644 --- a/src/dbnode/storage/bootstrap/process.go +++ b/src/dbnode/storage/bootstrap/process.go @@ -275,10 +275,10 @@ func (b bootstrapProcess) newShardTimeRanges( window xtime.Range, shards []uint32, ) result.ShardTimeRanges { - shardsTimeRanges := make(result.ShardTimeRanges, len(shards)) + shardsTimeRanges := result.NewShardTimeRanges() ranges := xtime.NewRanges(window) for _, s := range shards { - shardsTimeRanges[s] = ranges + shardsTimeRanges.Set(s, ranges) } return shardsTimeRanges } diff --git a/src/dbnode/storage/bootstrap/result/result_data.go b/src/dbnode/storage/bootstrap/result/result_data.go index 87838d7ae6..6189905bf8 100644 --- a/src/dbnode/storage/bootstrap/result/result_data.go +++ b/src/dbnode/storage/bootstrap/result/result_data.go @@ -34,7 +34,7 @@ type dataBootstrapResult struct { // NewDataBootstrapResult creates a new result. func NewDataBootstrapResult() DataBootstrapResult { return &dataBootstrapResult{ - unfulfilled: make(ShardTimeRanges), + unfulfilled: NewShardTimeRanges(), } } diff --git a/src/dbnode/storage/bootstrap/result/result_data_test.go b/src/dbnode/storage/bootstrap/result/result_data_test.go index 2e5eb242b3..e120783181 100644 --- a/src/dbnode/storage/bootstrap/result/result_data_test.go +++ b/src/dbnode/storage/bootstrap/result/result_data_test.go @@ -44,7 +44,7 @@ func testResultOptions() Options { func TestDataResultSetUnfulfilledMergeShardResults(t *testing.T) { start := time.Now().Truncate(testBlockSize) - rangeOne := ShardTimeRanges{ + rangeOne := shardTimeRanges{ 0: xtime.NewRanges(xtime.Range{ Start: start, End: start.Add(8 * testBlockSize), @@ -55,7 +55,7 @@ func TestDataResultSetUnfulfilledMergeShardResults(t *testing.T) { }), } - rangeTwo := ShardTimeRanges{ + rangeTwo := shardTimeRanges{ 0: xtime.NewRanges(xtime.Range{ Start: start.Add(6 * testBlockSize), End: start.Add(10 * testBlockSize), @@ -81,7 +81,7 @@ func TestDataResultSetUnfulfilledMergeShardResults(t *testing.T) { assert.True(t, rMerged.Unfulfilled().Equal(rangeOne)) rMerged = MergedDataBootstrapResult(r, rTwo) - expected := ShardTimeRanges{ + expected := shardTimeRanges{ 0: xtime.NewRanges(xtime.Range{ Start: start, End: start.Add(10 * testBlockSize), @@ -101,27 +101,27 @@ func TestDataResultSetUnfulfilledMergeShardResults(t *testing.T) { func TestDataResultSetUnfulfilledOverwitesUnfulfilled(t *testing.T) { start := time.Now().Truncate(testBlockSize) r := NewDataBootstrapResult() - r.SetUnfulfilled(ShardTimeRanges{ + r.SetUnfulfilled(shardTimeRanges{ 0: xtime.NewRanges(xtime.Range{ Start: start, End: start.Add(8 * testBlockSize), }), }) - expected := ShardTimeRanges{0: xtime.NewRanges(xtime.Range{ + expected := shardTimeRanges{0: xtime.NewRanges(xtime.Range{ Start: start, End: start.Add(8 * testBlockSize), })} assert.True(t, r.Unfulfilled().Equal(expected)) - r.SetUnfulfilled(ShardTimeRanges{ + r.SetUnfulfilled(shardTimeRanges{ 0: xtime.NewRanges(xtime.Range{ Start: start.Add(6 * testBlockSize), End: start.Add(10 * testBlockSize), }), }) - expected = ShardTimeRanges{0: xtime.NewRanges(xtime.Range{ + expected = shardTimeRanges{0: xtime.NewRanges(xtime.Range{ Start: start.Add(6 * testBlockSize), End: start.Add(10 * testBlockSize), })} @@ -133,7 +133,7 @@ func TestResultSetUnfulfilled(t *testing.T) { start := time.Now().Truncate(testBlockSize) r := NewDataBootstrapResult() - r.SetUnfulfilled(ShardTimeRanges{ + r.SetUnfulfilled(shardTimeRanges{ 0: xtime.NewRanges(xtime.Range{ Start: start, End: start.Add(2 * testBlockSize), @@ -143,14 +143,14 @@ func TestResultSetUnfulfilled(t *testing.T) { End: start.Add(2 * testBlockSize), }), }) - r.SetUnfulfilled(ShardTimeRanges{ + r.SetUnfulfilled(shardTimeRanges{ 1: xtime.NewRanges(xtime.Range{ Start: start, End: start.Add(2 * testBlockSize), }), }) - assert.True(t, r.Unfulfilled().Equal(ShardTimeRanges{ + assert.True(t, r.Unfulfilled().Equal(shardTimeRanges{ 1: xtime.NewRanges(xtime.Range{ Start: start, End: start.Add(2 * testBlockSize), @@ -273,17 +273,17 @@ func TestShardResultRemoveSeries(t *testing.T) { } func TestShardTimeRangesIsEmpty(t *testing.T) { - assert.True(t, ShardTimeRanges{}.IsEmpty()) - assert.True(t, ShardTimeRanges{0: xtime.Ranges{}, 1: xtime.Ranges{}}.IsEmpty()) - assert.True(t, ShardTimeRanges{0: xtime.NewRanges(xtime.Range{})}.IsEmpty()) - assert.False(t, ShardTimeRanges{0: xtime.NewRanges(xtime.Range{ + assert.True(t, shardTimeRanges{}.IsEmpty()) + assert.True(t, shardTimeRanges{0: xtime.NewRanges(), 1: xtime.NewRanges()}.IsEmpty()) + assert.True(t, shardTimeRanges{0: xtime.NewRanges(xtime.Range{})}.IsEmpty()) + assert.False(t, shardTimeRanges{0: xtime.NewRanges(xtime.Range{ Start: time.Now(), End: time.Now().Add(time.Second), })}.IsEmpty()) } func TestShardTimeRangesCopy(t *testing.T) { - str := ShardTimeRanges{0: xtime.NewRanges(xtime.Range{ + str := shardTimeRanges{0: xtime.NewRanges(xtime.Range{ Start: time.Now(), End: time.Now().Add(time.Second), })} @@ -294,7 +294,7 @@ func TestShardTimeRangesCopy(t *testing.T) { } func TestShardTimeRangesToUnfulfilledDataResult(t *testing.T) { - str := ShardTimeRanges{ + str := shardTimeRanges{ 0: xtime.NewRanges(xtime.Range{ Start: time.Now(), End: time.Now().Add(time.Minute), @@ -311,7 +311,7 @@ func TestShardTimeRangesToUnfulfilledDataResult(t *testing.T) { func TestShardTimeRangesSubtract(t *testing.T) { start := time.Now().Truncate(testBlockSize) - str := ShardTimeRanges{ + str := shardTimeRanges{ 0: xtime.NewRanges(xtime.Range{ Start: start, End: start.Add(2 * testBlockSize), @@ -321,7 +321,7 @@ func TestShardTimeRangesSubtract(t *testing.T) { End: start.Add(2 * testBlockSize), }), } - str.Subtract(ShardTimeRanges{ + str.Subtract(shardTimeRanges{ 0: xtime.NewRanges(xtime.Range{ Start: start, End: start.Add(testBlockSize), @@ -332,7 +332,7 @@ func TestShardTimeRangesSubtract(t *testing.T) { }), }) - assert.True(t, str.Equal(ShardTimeRanges{ + assert.True(t, str.Equal(shardTimeRanges{ 0: xtime.NewRanges(xtime.Range{ Start: start.Add(testBlockSize), End: start.Add(2 * testBlockSize), @@ -348,7 +348,7 @@ func TestShardTimeRangesMinMax(t *testing.T) { start := time.Now().Truncate(testBlockSize) - str := ShardTimeRanges{ + str := shardTimeRanges{ 0: xtime.NewRanges(xtime.Range{ Start: start, End: start.Add(testBlockSize), @@ -374,14 +374,10 @@ func TestShardTimeRangesString(t *testing.T) { []time.Time{start, start.Add(2 * testBlockSize)}, } - str := ShardTimeRanges{ - 0: xtime.NewRanges(xtime.Range{ - Start: ts[0][0], - End: ts[0][1], - }).AddRange(xtime.Range{ - Start: ts[1][0], - End: ts[1][1], - }), + str := shardTimeRanges{ + 0: xtime.NewRanges( + xtime.Range{Start: ts[0][0], End: ts[0][1]}, + xtime.Range{Start: ts[1][0], End: ts[1][1]}), 1: xtime.NewRanges(xtime.Range{ Start: ts[2][0], End: ts[2][1], @@ -399,14 +395,10 @@ func TestShardTimeRangesString(t *testing.T) { func TestShardTimeRangesSummaryString(t *testing.T) { start := time.Unix(1472824800, 0) - str := ShardTimeRanges{ - 0: xtime.NewRanges(xtime.Range{ - Start: start, - End: start.Add(testBlockSize), - }).AddRange(xtime.Range{ - Start: start.Add(2 * testBlockSize), - End: start.Add(4 * testBlockSize), - }), + str := shardTimeRanges{ + 0: xtime.NewRanges( + xtime.Range{Start: start, End: start.Add(testBlockSize)}, + xtime.Range{Start: start.Add(2 * testBlockSize), End: start.Add(4 * testBlockSize)}), 1: xtime.NewRanges(xtime.Range{ Start: start, End: start.Add(2 * testBlockSize), diff --git a/src/dbnode/storage/bootstrap/result/result_index.go b/src/dbnode/storage/bootstrap/result/result_index.go index 96270f1c01..40fc665d9e 100644 --- a/src/dbnode/storage/bootstrap/result/result_index.go +++ b/src/dbnode/storage/bootstrap/result/result_index.go @@ -49,7 +49,7 @@ type indexBootstrapResult struct { func NewIndexBootstrapResult() IndexBootstrapResult { return &indexBootstrapResult{ results: make(IndexResults), - unfulfilled: make(ShardTimeRanges), + unfulfilled: NewShardTimeRanges(), } } @@ -241,7 +241,7 @@ func NewIndexBlock( fulfilled ShardTimeRanges, ) IndexBlock { if fulfilled == nil { - fulfilled = ShardTimeRanges{} + fulfilled = NewShardTimeRanges() } return IndexBlock{ blockStart: blockStart, diff --git a/src/dbnode/storage/bootstrap/result/result_index_test.go b/src/dbnode/storage/bootstrap/result/result_index_test.go index 98b23218e8..687f11b94e 100644 --- a/src/dbnode/storage/bootstrap/result/result_index_test.go +++ b/src/dbnode/storage/bootstrap/result/result_index_test.go @@ -49,8 +49,8 @@ func TestIndexResultMergeMergesExistingSegments(t *testing.T) { } times := []time.Time{start, start.Add(testBlockSize), start.Add(2 * testBlockSize)} - tr0 := NewShardTimeRanges(times[0], times[1], 1, 2, 3) - tr1 := NewShardTimeRanges(times[1], times[2], 1, 2, 3) + tr0 := NewShardTimeRangesFromRange(times[0], times[1], 1, 2, 3) + tr1 := NewShardTimeRangesFromRange(times[1], times[2], 1, 2, 3) first := NewIndexBootstrapResult() first.Add(NewIndexBlock(times[0], []segment.Segment{segments[0]}, tr0), nil) @@ -79,7 +79,7 @@ func TestIndexResultSetUnfulfilled(t *testing.T) { return t0.Add(time.Duration(i) * time.Hour) } results := NewIndexBootstrapResult() - testRanges := NewShardTimeRanges(tn(0), tn(1), 1, 2, 3, 4, 5, 6, 7, 8, 9, 10) + testRanges := NewShardTimeRangesFromRange(tn(0), tn(1), 1, 2, 3, 4, 5, 6, 7, 8, 9, 10) results.SetUnfulfilled(testRanges) require.Equal(t, testRanges, results.Unfulfilled()) } @@ -93,13 +93,13 @@ func TestIndexResultAdd(t *testing.T) { return t0.Add(time.Duration(i) * time.Hour) } results := NewIndexBootstrapResult() - testRanges := NewShardTimeRanges(tn(0), tn(1), 1, 2, 3, 4, 5, 6, 7, 8, 9, 10) + testRanges := NewShardTimeRangesFromRange(tn(0), tn(1), 1, 2, 3, 4, 5, 6, 7, 8, 9, 10) results.Add(IndexBlock{}, testRanges) require.Equal(t, testRanges, results.Unfulfilled()) } func TestShardTimeRangesToUnfulfilledIndexResult(t *testing.T) { - str := ShardTimeRanges{ + str := shardTimeRanges{ 0: xtime.NewRanges(xtime.Range{ Start: time.Now(), End: time.Now().Add(time.Minute), @@ -127,12 +127,12 @@ func TestIndexResulsMarkFulfilled(t *testing.T) { // range checks require.Error(t, results.MarkFulfilled(tn(0), - NewShardTimeRanges(tn(4), tn(6), 1), iopts)) + NewShardTimeRangesFromRange(tn(4), tn(6), 1), iopts)) require.Error(t, results.MarkFulfilled(tn(0), - NewShardTimeRanges(tn(-1), tn(1), 1), iopts)) + NewShardTimeRangesFromRange(tn(-1), tn(1), 1), iopts)) // valid add - fulfilledRange := NewShardTimeRanges(tn(0), tn(1), 1) + fulfilledRange := NewShardTimeRangesFromRange(tn(0), tn(1), 1) require.NoError(t, results.MarkFulfilled(tn(0), fulfilledRange, iopts)) require.Equal(t, 1, len(results)) blk, ok := results[xtime.ToUnixNano(tn(0))] @@ -141,7 +141,7 @@ func TestIndexResulsMarkFulfilled(t *testing.T) { require.Equal(t, fulfilledRange, blk.fulfilled) // additional add for same block - nextFulfilledRange := NewShardTimeRanges(tn(1), tn(2), 2) + nextFulfilledRange := NewShardTimeRangesFromRange(tn(1), tn(2), 2) require.NoError(t, results.MarkFulfilled(tn(1), nextFulfilledRange, iopts)) require.Equal(t, 1, len(results)) blk, ok = results[xtime.ToUnixNano(tn(0))] @@ -151,7 +151,7 @@ func TestIndexResulsMarkFulfilled(t *testing.T) { require.Equal(t, fulfilledRange, blk.fulfilled) // additional add for next block - nextFulfilledRange = NewShardTimeRanges(tn(2), tn(4), 1, 2, 3) + nextFulfilledRange = NewShardTimeRangesFromRange(tn(2), tn(4), 1, 2, 3) require.NoError(t, results.MarkFulfilled(tn(2), nextFulfilledRange, iopts)) require.Equal(t, 2, len(results)) blk, ok = results[xtime.ToUnixNano(tn(2))] diff --git a/src/dbnode/storage/bootstrap/result/shard_ranges.go b/src/dbnode/storage/bootstrap/result/shard_ranges.go index 2d547b913a..aea75365b3 100644 --- a/src/dbnode/storage/bootstrap/result/shard_ranges.go +++ b/src/dbnode/storage/bootstrap/result/shard_ranges.go @@ -29,18 +29,58 @@ import ( xtime "github.com/m3db/m3/src/x/time" ) -// NewShardTimeRanges returns a new ShardTimeRanges with provided shards and time range. -func NewShardTimeRanges(start, end time.Time, shards ...uint32) ShardTimeRanges { +// NewShardTimeRangesFromRange returns a new ShardTimeRanges with provided shards and time range. +func NewShardTimeRangesFromRange(start, end time.Time, shards ...uint32) ShardTimeRanges { timeRange := xtime.NewRanges(xtime.Range{Start: start, End: end}) - ranges := make(map[uint32]xtime.Ranges) + ranges := make(shardTimeRanges, len(shards)) for _, s := range shards { ranges[s] = timeRange } return ranges } +// NewShardTimeRangesFromSize returns a new ShardTimeRanges with provided shards and time range. +func NewShardTimeRangesFromSize(size int) ShardTimeRanges { + return make(shardTimeRanges, size) +} + +// NewShardTimeRanges returns an empty ShardTimeRanges. +func NewShardTimeRanges() ShardTimeRanges { + return make(shardTimeRanges) +} + +// Get time ranges for a shard. +func (r shardTimeRanges) Get(shard uint32) (xtime.Ranges, bool) { + tr, ok := r[shard] + return tr, ok +} + +// Set time ranges for a shard. +func (r shardTimeRanges) Set(shard uint32, ranges xtime.Ranges) ShardTimeRanges { + r[shard] = ranges + return r +} + +// GetOrAdd gets or adds time ranges for a shard. +func (r shardTimeRanges) GetOrAdd(shard uint32) xtime.Ranges { + if r[shard] == nil { + r[shard] = xtime.NewRanges() + } + return r[shard] +} + +// Len returns then number of shards. +func (r shardTimeRanges) Len() int { + return len(r) +} + +// Iter returns the underlying map. +func (r shardTimeRanges) Iter() map[uint32]xtime.Ranges { + return r +} + // IsEmpty returns whether the shard time ranges is empty or not. -func (r ShardTimeRanges) IsEmpty() bool { +func (r shardTimeRanges) IsEmpty() bool { for _, ranges := range r { if !ranges.IsEmpty() { return false @@ -50,13 +90,13 @@ func (r ShardTimeRanges) IsEmpty() bool { } // Equal returns whether two shard time ranges are equal. -func (r ShardTimeRanges) Equal(other ShardTimeRanges) bool { - if len(r) != len(other) { +func (r shardTimeRanges) Equal(other ShardTimeRanges) bool { + if len(r) != other.Len() { return false } for shard, ranges := range r { - otherRanges, ok := other[shard] - if !ok { + otherRanges := other.GetOrAdd(shard) + if otherRanges == nil { return false } if ranges.Len() != otherRanges.Len() { @@ -77,22 +117,28 @@ func (r ShardTimeRanges) Equal(other ShardTimeRanges) bool { } // Copy will return a copy of the current shard time ranges. -func (r ShardTimeRanges) Copy() ShardTimeRanges { - result := make(map[uint32]xtime.Ranges, len(r)) +func (r shardTimeRanges) Copy() ShardTimeRanges { + result := make(shardTimeRanges, len(r)) for shard, ranges := range r { - result[shard] = xtime.Ranges{}.AddRanges(ranges) + newRanges := xtime.NewRanges() + newRanges.AddRanges(ranges) + result[shard] = newRanges } return result } // AddRanges adds other shard time ranges to the current shard time ranges. -func (r ShardTimeRanges) AddRanges(other ShardTimeRanges) { - for shard, ranges := range other { +func (r shardTimeRanges) AddRanges(other ShardTimeRanges) { + if other == nil { + return + } + for shard, ranges := range other.Iter() { if ranges.IsEmpty() { continue } if existing, ok := r[shard]; ok { - r[shard] = existing.AddRanges(ranges) + existing.AddRanges(ranges) + r[shard] = existing } else { r[shard] = ranges } @@ -101,7 +147,7 @@ func (r ShardTimeRanges) AddRanges(other ShardTimeRanges) { // ToUnfulfilledDataResult will return a result that is comprised of wholly // unfufilled time ranges from the set of shard time ranges. -func (r ShardTimeRanges) ToUnfulfilledDataResult() DataBootstrapResult { +func (r shardTimeRanges) ToUnfulfilledDataResult() DataBootstrapResult { result := NewDataBootstrapResult() result.SetUnfulfilled(r.Copy()) return result @@ -109,21 +155,25 @@ func (r ShardTimeRanges) ToUnfulfilledDataResult() DataBootstrapResult { // ToUnfulfilledIndexResult will return a result that is comprised of wholly // unfufilled time ranges from the set of shard time ranges. -func (r ShardTimeRanges) ToUnfulfilledIndexResult() IndexBootstrapResult { +func (r shardTimeRanges) ToUnfulfilledIndexResult() IndexBootstrapResult { result := NewIndexBootstrapResult() result.SetUnfulfilled(r.Copy()) return result } // Subtract will subtract another range from the current range. -func (r ShardTimeRanges) Subtract(other ShardTimeRanges) { +func (r shardTimeRanges) Subtract(other ShardTimeRanges) { + if other == nil { + return + } for shard, ranges := range r { - otherRanges, ok := other[shard] + otherRanges, ok := other.Get(shard) if !ok { continue } - subtractedRanges := ranges.RemoveRanges(otherRanges) + subtractedRanges := ranges.Clone() + subtractedRanges.RemoveRanges(otherRanges) if subtractedRanges.IsEmpty() { delete(r, shard) } else { @@ -134,7 +184,7 @@ func (r ShardTimeRanges) Subtract(other ShardTimeRanges) { // MinMax will return the very minimum time as a start and the // maximum time as an end in the ranges. -func (r ShardTimeRanges) MinMax() (time.Time, time.Time) { +func (r shardTimeRanges) MinMax() (time.Time, time.Time) { min, max := time.Time{}, time.Time{} for _, ranges := range r { if ranges.IsEmpty() { @@ -155,17 +205,17 @@ func (r ShardTimeRanges) MinMax() (time.Time, time.Time) { } // MinMaxRange returns the min and max times, and the duration for this range. -func (r ShardTimeRanges) MinMaxRange() (time.Time, time.Time, time.Duration) { +func (r shardTimeRanges) MinMaxRange() (time.Time, time.Time, time.Duration) { min, max := r.MinMax() return min, max, max.Sub(min) } type summaryFn func(xtime.Ranges) string -func (r ShardTimeRanges) summarize(sfn summaryFn) string { - values := make([]shardTimeRanges, 0, len(r)) +func (r shardTimeRanges) summarize(sfn summaryFn) string { + values := make([]shardTimeRangesPair, 0, len(r)) for shard, ranges := range r { - values = append(values, shardTimeRanges{shard: shard, value: ranges}) + values = append(values, shardTimeRangesPair{shard: shard, value: ranges}) } sort.Sort(shardTimeRangesByShard(values)) @@ -190,7 +240,7 @@ func (r ShardTimeRanges) summarize(sfn summaryFn) string { } // String returns a description of the time ranges -func (r ShardTimeRanges) String() string { +func (r shardTimeRanges) String() string { return r.summarize(xtime.Ranges.String) } @@ -207,16 +257,16 @@ func rangesDuration(ranges xtime.Ranges) string { } // SummaryString returns a summary description of the time ranges -func (r ShardTimeRanges) SummaryString() string { +func (r shardTimeRanges) SummaryString() string { return r.summarize(rangesDuration) } -type shardTimeRanges struct { +type shardTimeRangesPair struct { shard uint32 value xtime.Ranges } -type shardTimeRangesByShard []shardTimeRanges +type shardTimeRangesByShard []shardTimeRangesPair func (str shardTimeRangesByShard) Len() int { return len(str) } func (str shardTimeRangesByShard) Swap(i, j int) { str[i], str[j] = str[j], str[i] } diff --git a/src/dbnode/storage/bootstrap/result/types.go b/src/dbnode/storage/bootstrap/result/types.go index 4f8dd644e9..cc3ef06c6e 100644 --- a/src/dbnode/storage/bootstrap/result/types.go +++ b/src/dbnode/storage/bootstrap/result/types.go @@ -123,7 +123,59 @@ type DatabaseSeriesBlocks struct { type ShardResults map[uint32]ShardResult // ShardTimeRanges is a map of shards to time ranges. -type ShardTimeRanges map[uint32]xtime.Ranges +type ShardTimeRanges interface { + // Get time ranges for a shard. + Get(shard uint32) (xtime.Ranges, bool) + + // Set time ranges for a shard. + Set(shard uint32, ranges xtime.Ranges) ShardTimeRanges + + // GetOrAdd gets or adds time ranges for a shard. + GetOrAdd(shard uint32) xtime.Ranges + + // AddRanges adds other shard time ranges to the current shard time ranges. + AddRanges(ranges ShardTimeRanges) + + // Iter returns the underlying map. + Iter() map[uint32]xtime.Ranges + + Copy() ShardTimeRanges + + // Equal returns whether two shard time ranges are equal. + Equal(other ShardTimeRanges) bool + + // ToUnfulfilledDataResult will return a result that is comprised of wholly + // unfufilled time ranges from the set of shard time ranges. + ToUnfulfilledDataResult() DataBootstrapResult + + // ToUnfulfilledIndexResult will return a result that is comprised of wholly + // unfufilled time ranges from the set of shard time ranges. + ToUnfulfilledIndexResult() IndexBootstrapResult + + // Subtract will subtract another range from the current range. + Subtract(other ShardTimeRanges) + + // MinMax will return the very minimum time as a start and the + // maximum time as an end in the ranges. + MinMax() (time.Time, time.Time) + + // MinMaxRange returns the min and max times, and the duration for this range. + MinMaxRange() (time.Time, time.Time, time.Duration) + + // String returns a description of the time ranges + String() string + + // SummaryString returns a summary description of the time ranges + SummaryString() string + + // IsEmpty returns whether the shard time ranges is empty or not. + IsEmpty() bool + + // Len returns the number of shards + Len() int +} + +type shardTimeRanges map[uint32]xtime.Ranges // Options represents the options for bootstrap results. type Options interface { diff --git a/src/dbnode/storage/bootstrap/util.go b/src/dbnode/storage/bootstrap/util.go index f213f4bca3..9422c8e84a 100644 --- a/src/dbnode/storage/bootstrap/util.go +++ b/src/dbnode/storage/bootstrap/util.go @@ -347,8 +347,8 @@ func BuildNamespacesTesterWithReaderIteratorPool( iterPool encoding.MultiReaderIteratorPool, mds ...namespace.Metadata, ) NamespacesTester { - shards := make([]uint32, 0, len(ranges)) - for shard := range ranges { + shards := make([]uint32, 0, ranges.Len()) + for shard := range ranges.Iter() { shards = append(shards, shard) } @@ -551,14 +551,16 @@ func (nt *NamespacesTester) TestReadWith(s Source) { func validateRanges(ac xtime.Ranges, ex xtime.Ranges) error { // Make range eclipses expected. - removedRange := ex.RemoveRanges(ac) + removedRange := ex.Clone() + removedRange.RemoveRanges(ac) if !removedRange.IsEmpty() { return fmt.Errorf("actual range %v does not match expected range %v "+ "diff: %v", ac, ex, removedRange) } // Now make sure no ranges outside of expected. - expectedWithAddedRanges := ex.AddRanges(ac) + expectedWithAddedRanges := ex.Clone() + expectedWithAddedRanges.AddRanges(ac) if ex.Len() != expectedWithAddedRanges.Len() { return fmt.Errorf("expected with re-added ranges not equal") } @@ -579,14 +581,14 @@ func validateShardTimeRanges( r result.ShardTimeRanges, ex result.ShardTimeRanges, ) error { - if len(ex) != len(r) { + if ex.Len() != r.Len() { return fmt.Errorf("expected %v and actual %v size mismatch", ex, r) } - seen := make(map[uint32]struct{}, len(r)) - for k, val := range r { - expectedVal, found := ex[k] - if !found { + seen := make(map[uint32]struct{}, r.Len()) + for k, val := range r.Iter() { + expectedVal, ok := ex.Get(k) + if !ok { return fmt.Errorf("expected shard map %v does not have shard %d; "+ "actual: %v", ex, k, r) } @@ -598,7 +600,7 @@ func validateShardTimeRanges( seen[k] = struct{}{} } - for k := range ex { + for k := range ex.Iter() { if _, beenFound := seen[k]; !beenFound { return fmt.Errorf("shard %d in actual not found in expected %v", k, ex) } @@ -704,7 +706,7 @@ var _ gomock.Matcher = (*NamespaceMatcher)(nil) // ShardTimeRangesMatcher is a matcher for ShardTimeRanges. type ShardTimeRangesMatcher struct { // Ranges are the expected ranges. - Ranges map[uint32]xtime.Ranges + Ranges result.ShardTimeRanges } // Matches returns whether x is a match. diff --git a/src/dbnode/storage/index.go b/src/dbnode/storage/index.go index 97bb398a11..7620419b4b 100644 --- a/src/dbnode/storage/index.go +++ b/src/dbnode/storage/index.go @@ -761,7 +761,7 @@ func (i *nsIndex) Flush( } // Make a result that covers the entire time ranges for the // block for each shard - fulfilled := result.NewShardTimeRanges(block.StartTime(), block.EndTime(), + fulfilled := result.NewShardTimeRangesFromRange(block.StartTime(), block.EndTime(), dbShards(shards).IDs()...) // Add the results to the block results := result.NewIndexBlock(block.StartTime(), immutableSegments, @@ -1353,7 +1353,7 @@ func (i *nsIndex) blocksForQueryWithRLock(queryRange xtime.Ranges) ([]index.Bloc } // Remove this range from the query range. - queryRange = queryRange.RemoveRange(blockRange) + queryRange.RemoveRange(blockRange) blocks = append(blocks, block) } diff --git a/src/dbnode/storage/index/block.go b/src/dbnode/storage/index/block.go index b0e90f13d5..29a527920f 100644 --- a/src/dbnode/storage/index/block.go +++ b/src/dbnode/storage/index/block.go @@ -1240,7 +1240,7 @@ func (b *block) AddResults( // first see if this block can cover all our current blocks covering shard // time ranges. - currFulfilled := make(result.ShardTimeRanges) + currFulfilled := result.NewShardTimeRanges() for _, existing := range b.shardRangesSegments { currFulfilled.AddRanges(existing.shardTimeRanges) } diff --git a/src/dbnode/storage/index/block_prop_test.go b/src/dbnode/storage/index/block_prop_test.go index 8bf9974796..03dce3f83a 100644 --- a/src/dbnode/storage/index/block_prop_test.go +++ b/src/dbnode/storage/index/block_prop_test.go @@ -184,7 +184,7 @@ func newPropTestBlock(t *testing.T, blockStart time.Time, nsMeta namespace.Metad memSeg = testSegment(t, lotsTestDocuments...).(segment.MutableSegment) fstSeg = fst.ToTestSegment(t, memSeg, testFstOptions) // Need at least one shard to look fulfilled. - fulfilled = result.NewShardTimeRanges(blockStart, blockStart.Add(testBlockSize), uint32(1)) + fulfilled = result.NewShardTimeRangesFromRange(blockStart, blockStart.Add(testBlockSize), uint32(1)) indexBlock = result.NewIndexBlock(blockStart, []segment.Segment{fstSeg}, fulfilled) ) // Use the AddResults API because thats the only scenario in which we'll wrap a segment diff --git a/src/dbnode/storage/index/block_test.go b/src/dbnode/storage/index/block_test.go index db82cfb94e..c537145d77 100644 --- a/src/dbnode/storage/index/block_test.go +++ b/src/dbnode/storage/index/block_test.go @@ -843,7 +843,7 @@ func TestBlockAddResultsAddsSegment(t *testing.T) { seg1 := segment.NewMockMutableSegment(ctrl) require.NoError(t, b.AddResults( result.NewIndexBlock(start, []segment.Segment{seg1}, - result.NewShardTimeRanges(start, start.Add(time.Hour), 1, 2, 3)))) + result.NewShardTimeRangesFromRange(start, start.Add(time.Hour), 1, 2, 3)))) require.Equal(t, 1, len(b.shardRangesSegments)) require.Equal(t, seg1, b.shardRangesSegments[0].segments[0]) @@ -862,7 +862,7 @@ func TestBlockAddResultsAfterCloseFails(t *testing.T) { seg1 := segment.NewMockMutableSegment(ctrl) require.Error(t, blk.AddResults( result.NewIndexBlock(start, []segment.Segment{seg1}, - result.NewShardTimeRanges(start, start.Add(time.Hour), 1, 2, 3)))) + result.NewShardTimeRangesFromRange(start, start.Add(time.Hour), 1, 2, 3)))) } func TestBlockAddResultsAfterSealWorks(t *testing.T) { @@ -881,7 +881,7 @@ func TestBlockAddResultsAfterSealWorks(t *testing.T) { seg1 := segment.NewMockMutableSegment(ctrl) require.NoError(t, blk.AddResults( result.NewIndexBlock(start, []segment.Segment{seg1}, - result.NewShardTimeRanges(start, start.Add(time.Hour), 1, 2, 3)))) + result.NewShardTimeRangesFromRange(start, start.Add(time.Hour), 1, 2, 3)))) require.Equal(t, 1, len(b.shardRangesSegments)) require.Equal(t, seg1, b.shardRangesSegments[0].segments[0]) @@ -929,7 +929,7 @@ func TestBlockTickMultipleSegment(t *testing.T) { seg2.EXPECT().Size().Return(int64(20)) require.NoError(t, blk.AddResults( result.NewIndexBlock(start, []segment.Segment{seg2}, - result.NewShardTimeRanges(start, start.Add(time.Hour), 1, 2, 3)))) + result.NewShardTimeRangesFromRange(start, start.Add(time.Hour), 1, 2, 3)))) result, err := blk.Tick(nil) require.NoError(t, err) @@ -989,10 +989,10 @@ func TestBlockAddResultsRangeCheck(t *testing.T) { seg1 := segment.NewMockMutableSegment(ctrl) require.Error(t, b.AddResults( result.NewIndexBlock(start, []segment.Segment{seg1}, - result.NewShardTimeRanges(start.Add(-1*time.Minute), start.Add(time.Hour), 1, 2, 3)))) + result.NewShardTimeRangesFromRange(start.Add(-1*time.Minute), start.Add(time.Hour), 1, 2, 3)))) require.Error(t, b.AddResults( result.NewIndexBlock(start, []segment.Segment{seg1}, - result.NewShardTimeRanges(start, start.Add(2*time.Hour), 1, 2, 3)))) + result.NewShardTimeRangesFromRange(start, start.Add(2*time.Hour), 1, 2, 3)))) } func TestBlockAddResultsCoversCurrentData(t *testing.T) { @@ -1010,13 +1010,13 @@ func TestBlockAddResultsCoversCurrentData(t *testing.T) { seg1 := segment.NewMockMutableSegment(ctrl) require.NoError(t, b.AddResults( result.NewIndexBlock(start, []segment.Segment{seg1}, - result.NewShardTimeRanges(start, start.Add(time.Hour), 1, 2, 3)))) + result.NewShardTimeRangesFromRange(start, start.Add(time.Hour), 1, 2, 3)))) seg2 := segment.NewMockMutableSegment(ctrl) seg1.EXPECT().Close().Return(nil) require.NoError(t, b.AddResults( result.NewIndexBlock(start, []segment.Segment{seg2}, - result.NewShardTimeRanges(start, start.Add(time.Hour), 1, 2, 3, 4)))) + result.NewShardTimeRangesFromRange(start, start.Add(time.Hour), 1, 2, 3, 4)))) require.NoError(t, b.Seal()) seg2.EXPECT().Close().Return(nil) @@ -1038,12 +1038,12 @@ func TestBlockAddResultsDoesNotCoverCurrentData(t *testing.T) { seg1 := segment.NewMockMutableSegment(ctrl) require.NoError(t, b.AddResults( result.NewIndexBlock(start, []segment.Segment{seg1}, - result.NewShardTimeRanges(start, start.Add(time.Hour), 1, 2, 3)))) + result.NewShardTimeRangesFromRange(start, start.Add(time.Hour), 1, 2, 3)))) seg2 := segment.NewMockMutableSegment(ctrl) require.NoError(t, b.AddResults( result.NewIndexBlock(start, []segment.Segment{seg2}, - result.NewShardTimeRanges(start, start.Add(time.Hour), 1, 2, 5)))) + result.NewShardTimeRangesFromRange(start, start.Add(time.Hour), 1, 2, 5)))) require.NoError(t, b.Seal()) @@ -1104,7 +1104,7 @@ func TestBlockNeedsMutableSegmentsEvictedMutableSegments(t *testing.T) { seg1.EXPECT().Size().Return(int64(0)).AnyTimes() require.NoError(t, b.AddResults( result.NewIndexBlock(start, []segment.Segment{seg1}, - result.NewShardTimeRanges(start, start.Add(time.Hour), 1, 2, 3)))) + result.NewShardTimeRangesFromRange(start, start.Add(time.Hour), 1, 2, 3)))) require.False(t, b.NeedsMutableSegmentsEvicted()) seg2 := segment.NewMockMutableSegment(ctrl) @@ -1112,7 +1112,7 @@ func TestBlockNeedsMutableSegmentsEvictedMutableSegments(t *testing.T) { seg3 := segment.NewMockSegment(ctrl) require.NoError(t, b.AddResults( result.NewIndexBlock(start, []segment.Segment{seg2, seg3}, - result.NewShardTimeRanges(start, start.Add(time.Hour), 1, 2, 4)))) + result.NewShardTimeRangesFromRange(start, start.Add(time.Hour), 1, 2, 4)))) require.True(t, b.NeedsMutableSegmentsEvicted()) } @@ -1148,7 +1148,7 @@ func TestBlockEvictMutableSegmentsAddResults(t *testing.T) { seg1 := segment.NewMockMutableSegment(ctrl) require.NoError(t, b.AddResults( result.NewIndexBlock(start, []segment.Segment{seg1}, - result.NewShardTimeRanges(start, start.Add(time.Hour), 1, 2, 3)))) + result.NewShardTimeRangesFromRange(start, start.Add(time.Hour), 1, 2, 3)))) seg1.EXPECT().Close().Return(nil) err = b.EvictMutableSegments() require.NoError(t, err) @@ -1157,7 +1157,7 @@ func TestBlockEvictMutableSegmentsAddResults(t *testing.T) { seg3 := segment.NewMockSegment(ctrl) require.NoError(t, b.AddResults( result.NewIndexBlock(start, []segment.Segment{seg2, seg3}, - result.NewShardTimeRanges(start, start.Add(time.Hour), 1, 2, 4)))) + result.NewShardTimeRangesFromRange(start, start.Add(time.Hour), 1, 2, 4)))) seg2.EXPECT().Close().Return(nil) err = b.EvictMutableSegments() require.NoError(t, err) @@ -1375,7 +1375,7 @@ func TestBlockE2EInsertAddResultsQuery(t *testing.T) { seg := testSegment(t, testDoc1DupeID()) require.NoError(t, blk.AddResults( result.NewIndexBlock(blockStart, []segment.Segment{seg}, - result.NewShardTimeRanges(blockStart, blockStart.Add(blockSize), 1, 2, 3)))) + result.NewShardTimeRangesFromRange(blockStart, blockStart.Add(blockSize), 1, 2, 3)))) q, err := idx.NewRegexpQuery([]byte("bar"), []byte("b.*")) require.NoError(t, err) @@ -1451,7 +1451,7 @@ func TestBlockE2EInsertAddResultsMergeQuery(t *testing.T) { seg := testSegment(t, testDoc2()) require.NoError(t, blk.AddResults( result.NewIndexBlock(blockStart, []segment.Segment{seg}, - result.NewShardTimeRanges(blockStart, blockStart.Add(blockSize), 1, 2, 3)))) + result.NewShardTimeRangesFromRange(blockStart, blockStart.Add(blockSize), 1, 2, 3)))) q, err := idx.NewRegexpQuery([]byte("bar"), []byte("b.*")) require.NoError(t, err) diff --git a/src/dbnode/storage/index_block_test.go b/src/dbnode/storage/index_block_test.go index 28d0867a85..da65ce7277 100644 --- a/src/dbnode/storage/index_block_test.go +++ b/src/dbnode/storage/index_block_test.go @@ -374,8 +374,8 @@ func TestNamespaceIndexBootstrap(t *testing.T) { seg2 := segment.NewMockSegment(ctrl) seg3 := segment.NewMockSegment(ctrl) bootstrapResults := result.IndexResults{ - t0Nanos: result.NewIndexBlock(t0, []segment.Segment{seg1}, result.NewShardTimeRanges(t0, t1, 1, 2, 3)), - t1Nanos: result.NewIndexBlock(t1, []segment.Segment{seg2, seg3}, result.NewShardTimeRanges(t1, t2, 1, 2, 3)), + t0Nanos: result.NewIndexBlock(t0, []segment.Segment{seg1}, result.NewShardTimeRangesFromRange(t0, t1, 1, 2, 3)), + t1Nanos: result.NewIndexBlock(t1, []segment.Segment{seg2, seg3}, result.NewShardTimeRangesFromRange(t1, t2, 1, 2, 3)), } b0.EXPECT().AddResults(bootstrapResults[t0Nanos]).Return(nil) @@ -574,8 +574,8 @@ func TestNamespaceIndexBlockQuery(t *testing.T) { seg2 := segment.NewMockSegment(ctrl) seg3 := segment.NewMockSegment(ctrl) bootstrapResults := result.IndexResults{ - t0Nanos: result.NewIndexBlock(t0, []segment.Segment{seg1}, result.NewShardTimeRanges(t0, t1, 1, 2, 3)), - t1Nanos: result.NewIndexBlock(t1, []segment.Segment{seg2, seg3}, result.NewShardTimeRanges(t1, t2, 1, 2, 3)), + t0Nanos: result.NewIndexBlock(t0, []segment.Segment{seg1}, result.NewShardTimeRangesFromRange(t0, t1, 1, 2, 3)), + t1Nanos: result.NewIndexBlock(t1, []segment.Segment{seg2, seg3}, result.NewShardTimeRangesFromRange(t1, t2, 1, 2, 3)), } b0.EXPECT().AddResults(bootstrapResults[t0Nanos]).Return(nil) @@ -690,8 +690,8 @@ func TestNamespaceIndexBlockQueryReleasingContext(t *testing.T) { seg2 := segment.NewMockSegment(ctrl) seg3 := segment.NewMockSegment(ctrl) bootstrapResults := result.IndexResults{ - t0Nanos: result.NewIndexBlock(t0, []segment.Segment{seg1}, result.NewShardTimeRanges(t0, t1, 1, 2, 3)), - t1Nanos: result.NewIndexBlock(t1, []segment.Segment{seg2, seg3}, result.NewShardTimeRanges(t1, t2, 1, 2, 3)), + t0Nanos: result.NewIndexBlock(t0, []segment.Segment{seg1}, result.NewShardTimeRangesFromRange(t0, t1, 1, 2, 3)), + t1Nanos: result.NewIndexBlock(t1, []segment.Segment{seg2, seg3}, result.NewShardTimeRangesFromRange(t1, t2, 1, 2, 3)), } b0.EXPECT().AddResults(bootstrapResults[t0Nanos]).Return(nil) @@ -772,8 +772,8 @@ func TestNamespaceIndexBlockAggregateQuery(t *testing.T) { seg2 := segment.NewMockSegment(ctrl) seg3 := segment.NewMockSegment(ctrl) bootstrapResults := result.IndexResults{ - t0Nanos: result.NewIndexBlock(t0, []segment.Segment{seg1}, result.NewShardTimeRanges(t0, t1, 1, 2, 3)), - t1Nanos: result.NewIndexBlock(t1, []segment.Segment{seg2, seg3}, result.NewShardTimeRanges(t1, t2, 1, 2, 3)), + t0Nanos: result.NewIndexBlock(t0, []segment.Segment{seg1}, result.NewShardTimeRangesFromRange(t0, t1, 1, 2, 3)), + t1Nanos: result.NewIndexBlock(t1, []segment.Segment{seg2, seg3}, result.NewShardTimeRangesFromRange(t1, t2, 1, 2, 3)), } b0.EXPECT().AddResults(bootstrapResults[t0Nanos]).Return(nil) @@ -895,8 +895,8 @@ func TestNamespaceIndexBlockAggregateQueryReleasingContext(t *testing.T) { seg2 := segment.NewMockSegment(ctrl) seg3 := segment.NewMockSegment(ctrl) bootstrapResults := result.IndexResults{ - t0Nanos: result.NewIndexBlock(t0, []segment.Segment{seg1}, result.NewShardTimeRanges(t0, t1, 1, 2, 3)), - t1Nanos: result.NewIndexBlock(t1, []segment.Segment{seg2, seg3}, result.NewShardTimeRanges(t1, t2, 1, 2, 3)), + t0Nanos: result.NewIndexBlock(t0, []segment.Segment{seg1}, result.NewShardTimeRangesFromRange(t0, t1, 1, 2, 3)), + t1Nanos: result.NewIndexBlock(t1, []segment.Segment{seg2, seg3}, result.NewShardTimeRangesFromRange(t1, t2, 1, 2, 3)), } b0.EXPECT().AddResults(bootstrapResults[t0Nanos]).Return(nil) @@ -982,8 +982,8 @@ func TestNamespaceIndexBlockAggregateQueryAggPath(t *testing.T) { seg2 := segment.NewMockSegment(ctrl) seg3 := segment.NewMockSegment(ctrl) bootstrapResults := result.IndexResults{ - t0Nanos: result.NewIndexBlock(t0, []segment.Segment{seg1}, result.NewShardTimeRanges(t0, t1, 1, 2, 3)), - t1Nanos: result.NewIndexBlock(t1, []segment.Segment{seg2, seg3}, result.NewShardTimeRanges(t1, t2, 1, 2, 3)), + t0Nanos: result.NewIndexBlock(t0, []segment.Segment{seg1}, result.NewShardTimeRangesFromRange(t0, t1, 1, 2, 3)), + t1Nanos: result.NewIndexBlock(t1, []segment.Segment{seg2, seg3}, result.NewShardTimeRangesFromRange(t1, t2, 1, 2, 3)), } b0.EXPECT().AddResults(bootstrapResults[t0Nanos]).Return(nil) diff --git a/src/dbnode/storage/namespace.go b/src/dbnode/storage/namespace.go index fc672976a3..5db4f59926 100644 --- a/src/dbnode/storage/namespace.go +++ b/src/dbnode/storage/namespace.go @@ -939,7 +939,7 @@ func (n *dbNamespace) Bootstrap( bootstrapType string, unfulfilled result.ShardTimeRanges, ) error { - shardsUnfulfilled := int64(len(unfulfilled)) + shardsUnfulfilled := int64(unfulfilled.Len()) n.metrics.unfulfilled.Inc(shardsUnfulfilled) if shardsUnfulfilled == 0 { return nil diff --git a/src/x/time/ranges.go b/src/x/time/ranges.go index 1106f54566..f4a4f9c4bf 100644 --- a/src/x/time/ranges.go +++ b/src/x/time/ranges.go @@ -26,34 +26,44 @@ import ( ) // Ranges is a collection of time ranges. -type Ranges struct { +type Ranges interface { + AddRange(Range) + AddRanges(Ranges) + RemoveRange(Range) + RemoveRanges(Ranges) + Overlaps(Range) bool + Iter() *RangeIter + Clone() Ranges + Len() int + IsEmpty() bool + String() string +} + +type ranges struct { sortedRanges *list.List } // NewRanges constructs a new Ranges object comprising the provided ranges. -func NewRanges(ranges ...Range) Ranges { - var result Ranges - for _, r := range ranges { - result = result.AddRange(r) +func NewRanges(in ...Range) Ranges { + res := &ranges{sortedRanges: list.New()} + for _, r := range in { + res.AddRange(r) } - return result + return res } // Len returns the number of ranges included. -func (tr Ranges) Len() int { - if tr.sortedRanges == nil { - return 0 - } +func (tr *ranges) Len() int { return tr.sortedRanges.Len() } // IsEmpty returns true if the list of time ranges is empty. -func (tr Ranges) IsEmpty() bool { +func (tr *ranges) IsEmpty() bool { return tr.Len() == 0 } // Overlaps checks if the range overlaps with any of the ranges in the collection. -func (tr Ranges) Overlaps(r Range) bool { +func (tr *ranges) Overlaps(r Range) bool { if r.IsEmpty() { return false } @@ -66,54 +76,53 @@ func (tr Ranges) Overlaps(r Range) bool { } // AddRange adds the time range to the collection of ranges. -func (tr Ranges) AddRange(r Range) Ranges { - res := tr.clone() - res.addRangeInPlace(r) - return res +func (tr *ranges) AddRange(r Range) { + tr.addRangeInPlace(r) } // AddRanges adds the time ranges. -func (tr Ranges) AddRanges(other Ranges) Ranges { - res := tr.clone() +func (tr *ranges) AddRanges(other Ranges) { it := other.Iter() for it.Next() { - res.addRangeInPlace(it.Value()) + tr.addRangeInPlace(it.Value()) } - return res } // RemoveRange removes the time range from the collection of ranges. -func (tr Ranges) RemoveRange(r Range) Ranges { - res := tr.clone() - res.removeRangeInPlace(r) - return res +func (tr *ranges) RemoveRange(r Range) { + tr.removeRangeInPlace(r) } // RemoveRanges removes the given time ranges from the current one. -func (tr Ranges) RemoveRanges(other Ranges) Ranges { - res := tr.clone() +func (tr *ranges) RemoveRanges(other Ranges) { it := other.Iter() for it.Next() { - res.removeRangeInPlace(it.Value()) + tr.removeRangeInPlace(it.Value()) } - return res } // Iter returns an iterator that iterates over the time ranges included. -func (tr Ranges) Iter() *RangeIter { +func (tr *ranges) Iter() *RangeIter { return newRangeIter(tr.sortedRanges) } +// Clone makes a clone of the time ranges. +func (tr *ranges) Clone() Ranges { + res := &ranges{sortedRanges: list.New()} + for e := tr.sortedRanges.Front(); e != nil; e = e.Next() { + res.sortedRanges.PushBack(e.Value.(Range)) + } + return res +} + // String returns the string representation of the range. -func (tr Ranges) String() string { +func (tr *ranges) String() string { var buf bytes.Buffer buf.WriteString("[") - if tr.sortedRanges != nil { - for e := tr.sortedRanges.Front(); e != nil; e = e.Next() { - buf.WriteString(e.Value.(Range).String()) - if e.Next() != nil { - buf.WriteString(",") - } + for e := tr.sortedRanges.Front(); e != nil; e = e.Next() { + buf.WriteString(e.Value.(Range).String()) + if e.Next() != nil { + buf.WriteString(",") } } buf.WriteString("]") @@ -121,7 +130,7 @@ func (tr Ranges) String() string { } // addRangeInPlace adds r to tr in place without creating a new copy. -func (tr Ranges) addRangeInPlace(r Range) { +func (tr *ranges) addRangeInPlace(r Range) { if r.IsEmpty() { return } @@ -144,7 +153,7 @@ func (tr Ranges) addRangeInPlace(r Range) { tr.sortedRanges.InsertBefore(r, e) } -func (tr Ranges) removeRangeInPlace(r Range) { +func (tr *ranges) removeRangeInPlace(r Range) { if r.IsEmpty() { return } @@ -169,7 +178,7 @@ func (tr Ranges) removeRangeInPlace(r Range) { } // findFirstNotBefore finds the first interval that's not before r. -func (tr Ranges) findFirstNotBefore(r Range) *list.Element { +func (tr *ranges) findFirstNotBefore(r Range) *list.Element { if tr.sortedRanges == nil { return nil } @@ -180,15 +189,3 @@ func (tr Ranges) findFirstNotBefore(r Range) *list.Element { } return nil } - -// clone returns a copy of the time ranges. -func (tr Ranges) clone() Ranges { - res := Ranges{sortedRanges: list.New()} - if tr.sortedRanges == nil { - return res - } - for e := tr.sortedRanges.Front(); e != nil; e = e.Next() { - res.sortedRanges.PushBack(e.Value.(Range)) - } - return res -} diff --git a/src/x/time/ranges_test.go b/src/x/time/ranges_test.go index 1cf26dcb21..fd810150a6 100644 --- a/src/x/time/ranges_test.go +++ b/src/x/time/ranges_test.go @@ -27,16 +27,6 @@ import ( "github.com/stretchr/testify/require" ) -func validateResult(t *testing.T, tr Ranges, expected []Range) { - l := tr.sortedRanges - require.Equal(t, len(expected), l.Len()) - idx := 0 - for e := l.Front(); e != nil; e = e.Next() { - require.Equal(t, e.Value.(Range), expected[idx]) - idx++ - } -} - func validateIter(t *testing.T, it *RangeIter, expected []Range) { idx := 0 for it.Next() { @@ -68,28 +58,30 @@ func getRangesToRemove() []Range { } func getPopulatedRanges(ranges []Range, start, end int) Ranges { - var tr Ranges + tr := NewRanges() for _, r := range ranges[start:end] { - tr = tr.AddRange(r) + tr.AddRange(r) } return tr } func TestIsEmpty(t *testing.T) { - var tr Ranges + tr := NewRanges() require.True(t, tr.IsEmpty()) - tr = tr.clone() - tr.sortedRanges.PushBack(Range{}) + tr.AddRange(getRangesToAdd()[0]) require.False(t, tr.IsEmpty()) } func TestNewRanges(t *testing.T) { rangesToAdd := getRangesToAdd() exp := getPopulatedRanges(rangesToAdd, 0, len(rangesToAdd)) + clone := exp.Clone() obs := NewRanges(rangesToAdd...) - require.True(t, exp.RemoveRanges(obs).IsEmpty()) - require.True(t, obs.RemoveRanges(exp).IsEmpty()) + exp.RemoveRanges(clone) + require.True(t, exp.IsEmpty()) + obs.RemoveRanges(clone) + require.True(t, obs.IsEmpty()) } func TestClone(t *testing.T) { @@ -97,18 +89,18 @@ func TestClone(t *testing.T) { tr := getPopulatedRanges(rangesToAdd, 0, 4) expectedResults := []Range{rangesToAdd[3], rangesToAdd[2], rangesToAdd[0], rangesToAdd[1]} - validateResult(t, tr, expectedResults) + validateIter(t, tr.Iter(), expectedResults) - cloned := tr.clone() - tr = tr.RemoveRange(rangesToAdd[0]) - validateResult(t, cloned, expectedResults) - validateResult(t, tr, []Range{rangesToAdd[3], rangesToAdd[2], rangesToAdd[1]}) + cloned := tr.Clone() + tr.RemoveRange(rangesToAdd[0]) + validateIter(t, cloned.Iter(), expectedResults) + validateIter(t, tr.Iter(), []Range{rangesToAdd[3], rangesToAdd[2], rangesToAdd[1]}) } func TestAddRange(t *testing.T) { - var tr Ranges - tr = tr.AddRange(Range{}) - validateResult(t, tr, []Range{}) + tr := NewRanges() + tr.AddRange(Range{}) + validateIter(t, tr.Iter(), []Range{}) rangestoAdd := getRangesToAdd() expectedResults := [][]Range{ @@ -121,30 +113,30 @@ func TestAddRange(t *testing.T) { {{Start: testStart.Add(-10 * time.Second), End: testStart.Add(15 * time.Second)}}, } - saved := tr + saved := tr.Clone() for i, r := range rangestoAdd { - tr = tr.AddRange(r) - validateResult(t, tr, expectedResults[i]) + tr.AddRange(r) + validateIter(t, tr.Iter(), expectedResults[i]) } - validateResult(t, saved, []Range{}) + validateIter(t, saved.Iter(), []Range{}) } func TestAddRanges(t *testing.T) { rangesToAdd := getRangesToAdd() tr := getPopulatedRanges(rangesToAdd, 0, 4) - tr = tr.AddRanges(Ranges{}) + tr.AddRanges(NewRanges()) expectedResults := []Range{rangesToAdd[3], rangesToAdd[2], rangesToAdd[0], rangesToAdd[1]} - validateResult(t, tr, expectedResults) + validateIter(t, tr.Iter(), expectedResults) tr2 := getPopulatedRanges(rangesToAdd, 4, 7) - saved := tr - tr = tr.AddRanges(tr2) + saved := tr.Clone() + tr.AddRanges(tr2) expectedResults2 := []Range{{Start: testStart.Add(-10 * time.Second), End: testStart.Add(15 * time.Second)}} - validateResult(t, tr, expectedResults2) - validateResult(t, saved, expectedResults) + validateIter(t, tr.Iter(), expectedResults2) + validateIter(t, saved.Iter(), expectedResults) } func TestRemoveRange(t *testing.T) { @@ -173,26 +165,26 @@ func TestRemoveRange(t *testing.T) { }, } - saved := tr + saved := tr.Clone() for i, r := range rangesToRemove { - tr = tr.RemoveRange(r) - validateResult(t, tr, expectedResults[i]) + tr.RemoveRange(r) + validateIter(t, tr.Iter(), expectedResults[i]) } - tr = tr.RemoveRange(Range{}) - validateResult(t, tr, expectedResults[3]) + tr.RemoveRange(Range{}) + validateIter(t, tr.Iter(), expectedResults[3]) - tr = tr.RemoveRange(Range{ + tr.RemoveRange(Range{ Start: testStart.Add(-10 * time.Second), End: testStart.Add(15 * time.Second), }) require.True(t, tr.IsEmpty()) - validateResult(t, saved, expectedResults[0]) + validateIter(t, saved.Iter(), expectedResults[0]) } func TestRemoveRanges(t *testing.T) { tr := getPopulatedRanges(getRangesToAdd(), 0, 4) - tr = tr.RemoveRanges(Ranges{}) + tr.RemoveRanges(NewRanges()) expectedResults := []Range{ {Start: testStart.Add(-8 * time.Second), End: testStart.Add(-5 * time.Second)}, @@ -200,18 +192,18 @@ func TestRemoveRanges(t *testing.T) { {Start: testStart, End: testStart.Add(time.Second)}, {Start: testStart.Add(10 * time.Second), End: testStart.Add(15 * time.Second)}, } - validateResult(t, tr, expectedResults) + validateIter(t, tr.Iter(), expectedResults) - saved := tr + saved := tr.Clone() tr2 := getPopulatedRanges(getRangesToRemove(), 0, 4) - tr = tr.RemoveRanges(tr2) + tr.RemoveRanges(tr2) expectedResults2 := []Range{ {Start: testStart.Add(-8 * time.Second), End: testStart.Add(-6 * time.Second)}, {Start: testStart.Add(13 * time.Second), End: testStart.Add(15 * time.Second)}, } - validateResult(t, tr, expectedResults2) - validateResult(t, saved, expectedResults) + validateIter(t, tr.Iter(), expectedResults2) + validateIter(t, saved.Iter(), expectedResults) } func TestOverlaps(t *testing.T) { @@ -236,15 +228,16 @@ func TestRangesIter(t *testing.T) { {Start: testStart.Add(10 * time.Second), End: testStart.Add(15 * time.Second)}, } validateIter(t, tr.Iter(), expectedResults) - tr = tr.RemoveRange(rangesToAdd[2]) + tr.RemoveRange(rangesToAdd[2]) validateIter(t, tr.Iter(), append(expectedResults[:1], expectedResults[2:]...)) } func TestRangesString(t *testing.T) { - var tr Ranges + tr := NewRanges() require.Equal(t, "[]", tr.String()) start := time.Unix(1465430400, 0).UTC() - tr = tr.AddRange(Range{Start: start, End: start.Add(2 * time.Hour)}). - AddRange(Range{Start: start.Add(4 * time.Hour), End: start.Add(5 * time.Hour)}) + tr.AddRanges(NewRanges( + Range{Start: start, End: start.Add(2 * time.Hour)}, + Range{Start: start.Add(4 * time.Hour), End: start.Add(5 * time.Hour)})) require.Equal(t, "[(2016-06-09 00:00:00 +0000 UTC,2016-06-09 02:00:00 +0000 UTC),(2016-06-09 04:00:00 +0000 UTC,2016-06-09 05:00:00 +0000 UTC)]", tr.String()) }