diff --git a/src/dbnode/persist/fs/retriever_options.go b/src/dbnode/persist/fs/retriever_options.go index 8616dd5369..9e14d2ce94 100644 --- a/src/dbnode/persist/fs/retriever_options.go +++ b/src/dbnode/persist/fs/retriever_options.go @@ -22,6 +22,7 @@ package fs import ( "errors" + "runtime" "github.com/m3db/m3/src/dbnode/storage/block" "github.com/m3db/m3/src/dbnode/x/xio" @@ -30,11 +31,12 @@ import ( ) const ( - defaultRequestPoolSize = 16384 - defaultFetchConcurrency = 2 + defaultRequestPoolSize = 16384 ) var ( + // Allow max concurrency to match available CPUs. + defaultFetchConcurrency = runtime.NumCPU() errBlockLeaseManagerNotSet = errors.New("block lease manager is not set") ) diff --git a/src/dbnode/persist/fs/seek_manager_test.go b/src/dbnode/persist/fs/seek_manager_test.go index 3459e1b90a..1f63ddde33 100644 --- a/src/dbnode/persist/fs/seek_manager_test.go +++ b/src/dbnode/persist/fs/seek_manager_test.go @@ -34,9 +34,16 @@ import ( "github.com/stretchr/testify/require" ) +const ( + defaultTestingFetchConcurrency = 2 +) + var ( defaultTestBlockRetrieverOptions = NewBlockRetrieverOptions(). - SetBlockLeaseManager(&block.NoopLeaseManager{}) + SetBlockLeaseManager(&block.NoopLeaseManager{}). + // Default value is determined by available CPUs, but for testing + // we want to have this been consistent across hardware. + SetFetchConcurrency(defaultTestingFetchConcurrency) ) func TestSeekerManagerCacheShardIndices(t *testing.T) { @@ -44,18 +51,20 @@ func TestSeekerManagerCacheShardIndices(t *testing.T) { shards := []uint32{2, 5, 9, 478, 1023} m := NewSeekerManager(nil, testDefaultOpts, defaultTestBlockRetrieverOptions).(*seekerManager) - var byTimes []*seekersByTime + byTimes := make(map[uint32]*seekersByTime) + var mu sync.Mutex m.openAnyUnopenSeekersFn = func(byTime *seekersByTime) error { - byTimes = append(byTimes, byTime) + mu.Lock() + byTimes[byTime.shard] = byTime + mu.Unlock() return nil } require.NoError(t, m.CacheShardIndices(shards)) - // Assert captured byTime objects match expectations require.Equal(t, len(shards), len(byTimes)) - for idx, shard := range shards { - byTimes[idx].shard = shard + for _, shard := range shards { + byTimes[shard].shard = shard } // Assert seeksByShardIdx match expectations @@ -63,13 +72,14 @@ func TestSeekerManagerCacheShardIndices(t *testing.T) { for _, shard := range shards { shardSet[shard] = struct{}{} } + for shard, byTime := range m.seekersByShardIdx { _, exists := shardSet[uint32(shard)] if !exists { require.False(t, byTime.accessed) } else { require.True(t, byTime.accessed) - require.Equal(t, uint32(shard), byTime.shard) + require.Equal(t, int(shard), int(byTime.shard)) } } } @@ -96,10 +106,10 @@ func TestSeekerManagerUpdateOpenLease(t *testing.T) { mock := NewMockDataFileSetSeeker(ctrl) // ConcurrentClone() will be called fetchConcurrency-1 times because the original can be used // as one of the clones. - for i := 0; i < defaultFetchConcurrency-1; i++ { + for i := 0; i < defaultTestingFetchConcurrency-1; i++ { mock.EXPECT().ConcurrentClone().Return(mock, nil) } - for i := 0; i < defaultFetchConcurrency; i++ { + for i := 0; i < defaultTestingFetchConcurrency; i++ { mock.EXPECT().Close().DoAndReturn(func() error { mockSeekerStatsLock.Lock() numMockSeekerCloses++ @@ -125,7 +135,7 @@ func TestSeekerManagerUpdateOpenLease(t *testing.T) { byTime := m.seekersByTime(shard) byTime.RLock() seekers := byTime.seekers[xtime.ToUnixNano(blockStart)] - require.Equal(t, defaultFetchConcurrency, len(seekers.active.seekers)) + require.Equal(t, defaultTestingFetchConcurrency, len(seekers.active.seekers)) require.Equal(t, 0, seekers.active.volume) byTime.RUnlock() require.NoError(t, m.Return(shard, blockStart, seeker)) @@ -144,13 +154,13 @@ func TestSeekerManagerUpdateOpenLease(t *testing.T) { byTime := m.seekersByTime(shard) byTime.RLock() seekers := byTime.seekers[xtime.ToUnixNano(blockStart)] - require.Equal(t, defaultFetchConcurrency, len(seekers.active.seekers)) + require.Equal(t, defaultTestingFetchConcurrency, len(seekers.active.seekers)) require.Equal(t, 1, seekers.active.volume) byTime.RUnlock() } // Ensure that the old seekers actually get closed. mockSeekerStatsLock.Lock() - require.Equal(t, len(shards)*defaultFetchConcurrency, numMockSeekerCloses) + require.Equal(t, len(shards)*defaultTestingFetchConcurrency, numMockSeekerCloses) mockSeekerStatsLock.Unlock() // Ensure that UpdateOpenLease() ignores updates for the wrong namespace. @@ -166,7 +176,7 @@ func TestSeekerManagerUpdateOpenLease(t *testing.T) { byTime := m.seekersByTime(shard) byTime.RLock() seekers := byTime.seekers[xtime.ToUnixNano(blockStart)] - require.Equal(t, defaultFetchConcurrency, len(seekers.active.seekers)) + require.Equal(t, defaultTestingFetchConcurrency, len(seekers.active.seekers)) // Should not have increased to 2. require.Equal(t, 1, seekers.active.volume) byTime.RUnlock() @@ -202,7 +212,7 @@ func TestSeekerManagerBorrowOpenSeekersLazy(t *testing.T) { mock := NewMockDataFileSetSeeker(ctrl) mock.EXPECT().Open(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(nil) mock.EXPECT().ConcurrentClone().Return(mock, nil) - for i := 0; i < defaultFetchConcurrency; i++ { + for i := 0; i < defaultTestingFetchConcurrency; i++ { mock.EXPECT().Close().Return(nil) mock.EXPECT().ConcurrentIDBloomFilter().Return(nil) } @@ -220,7 +230,7 @@ func TestSeekerManagerBorrowOpenSeekersLazy(t *testing.T) { byTime := m.seekersByTime(shard) byTime.RLock() seekers := byTime.seekers[xtime.ToUnixNano(time.Time{})] - require.Equal(t, defaultFetchConcurrency, len(seekers.active.seekers)) + require.Equal(t, defaultTestingFetchConcurrency, len(seekers.active.seekers)) byTime.RUnlock() require.NoError(t, m.Return(shard, time.Time{}, seeker)) }