From 3a934411c37d54f308729085078d590c543dda75 Mon Sep 17 00:00:00 2001 From: Joris Dral Date: Tue, 13 Aug 2024 17:12:48 +0200 Subject: [PATCH] Replace IORef by MutVar As a result, quite a few types are now parameterised over a state parameter `s` (e.g., `RealWorld` for `IO`). This change causes quite a bit of noise, but it's necessary for generalising all our code away from just `IO`. I have also considered whether it would be better to parameterise these types by a monad `m` instead of the state type `s`. An `m` parameter is slightly more general, since we can use `PrimState m` whenever `s` would be required. In the spirit of the YAGNI principle, I decided not to do that (for now). We can make that change later if turns out to be necessary. --- bench/macro/lsm-tree-bench-lookups.hs | 4 +- .../Bench/Database/LSMTree/Internal/Lookup.hs | 4 +- .../Bench/Database/LSMTree/Internal/Merge.hs | 7 +- .../Database/LSMTree/Internal/WriteBuffer.hs | 3 +- src/Database/LSMTree/Common.hs | 3 +- src/Database/LSMTree/Internal.hs | 112 +++++++++--------- src/Database/LSMTree/Internal/Lookup.hs | 18 +-- src/Database/LSMTree/Internal/Merge.hs | 29 ++--- src/Database/LSMTree/Internal/Run.hs | 44 +++---- src/Database/LSMTree/Internal/RunBuilder.hs | 61 +++++----- src/Database/LSMTree/Internal/RunReader.hs | 32 ++--- src/Database/LSMTree/Internal/RunReaders.hs | 48 ++++---- test/Test/Database/LSMTree/Internal/Merge.hs | 6 +- test/Test/Database/LSMTree/Internal/Run.hs | 10 +- .../Database/LSMTree/Internal/RunReaders.hs | 9 +- 15 files changed, 202 insertions(+), 188 deletions(-) diff --git a/bench/macro/lsm-tree-bench-lookups.hs b/bench/macro/lsm-tree-bench-lookups.hs index 775bcb075..5c1648a24 100644 --- a/bench/macro/lsm-tree-bench-lookups.hs +++ b/bench/macro/lsm-tree-bench-lookups.hs @@ -318,7 +318,7 @@ lookupsEnv :: -> FS.HasFS IO FS.HandleIO -> FS.HasBlockIO IO FS.HandleIO -> Run.RunDataCaching - -> IO ( V.Vector (Run (FS.Handle FS.HandleIO)) + -> IO ( V.Vector (Run RealWorld (FS.Handle FS.HandleIO)) , V.Vector (Bloom SerialisedKey) , V.Vector IndexCompact , V.Vector (FS.Handle FS.HandleIO) @@ -452,7 +452,7 @@ benchLookupsIO :: FS.HasBlockIO IO h -> ArenaManager RealWorld -> ResolveSerialisedValue - -> V.Vector (Run (FS.Handle h)) + -> V.Vector (Run RealWorld (FS.Handle h)) -> V.Vector (Bloom SerialisedKey) -> V.Vector IndexCompact -> V.Vector (FS.Handle h) diff --git a/bench/micro/Bench/Database/LSMTree/Internal/Lookup.hs b/bench/micro/Bench/Database/LSMTree/Internal/Lookup.hs index d2e7a918c..840ce2a62 100644 --- a/bench/micro/Bench/Database/LSMTree/Internal/Lookup.hs +++ b/bench/micro/Bench/Database/LSMTree/Internal/Lookup.hs @@ -166,7 +166,7 @@ lookupsInBatchesEnv :: , ArenaManager RealWorld , FS.HasFS IO FS.HandleIO , FS.HasBlockIO IO FS.HandleIO - , V.Vector (Run (FS.Handle FS.HandleIO)) + , V.Vector (Run RealWorld (FS.Handle FS.HandleIO)) , V.Vector SerialisedKey ) lookupsInBatchesEnv Config {..} = do @@ -197,7 +197,7 @@ lookupsInBatchesCleanup :: , ArenaManager RealWorld , FS.HasFS IO FS.HandleIO , FS.HasBlockIO IO FS.HandleIO - , V.Vector (Run (FS.Handle FS.HandleIO)) + , V.Vector (Run RealWorld (FS.Handle FS.HandleIO)) , V.Vector SerialisedKey ) -> IO () diff --git a/bench/micro/Bench/Database/LSMTree/Internal/Merge.hs b/bench/micro/Bench/Database/LSMTree/Internal/Merge.hs index df0c8c869..8e0afddd2 100644 --- a/bench/micro/Bench/Database/LSMTree/Internal/Merge.hs +++ b/bench/micro/Bench/Database/LSMTree/Internal/Merge.hs @@ -1,6 +1,7 @@ module Bench.Database.LSMTree.Internal.Merge (benchmarks) where import Control.Monad (when, zipWithM) +import Control.Monad.Primitive import Criterion.Main (Benchmark, bench, bgroup) import qualified Criterion.Main as Cr import Data.Bifunctor (first) @@ -226,7 +227,7 @@ merge :: -> Config -> Run.RunFsPaths -> InputRuns - -> IO (Run (FS.Handle (FS.HandleIO))) + -> IO (Run RealWorld (FS.Handle (FS.HandleIO))) merge fs hbio Config {..} targetPaths runs = do let f = fromMaybe const mergeMappend m <- fromMaybe (error "empty inputs, no merge created") <$> @@ -244,7 +245,7 @@ outputRunPaths = RunFsPaths (FS.mkFsPath []) 0 inputRunPaths :: [Run.RunFsPaths] inputRunPaths = RunFsPaths (FS.mkFsPath []) <$> [1..] -type InputRuns = [Run (FS.Handle FS.HandleIO)] +type InputRuns = [Run RealWorld (FS.Handle FS.HandleIO)] type Mappend = SerialisedValue -> SerialisedValue -> SerialisedValue @@ -360,7 +361,7 @@ createRun :: -> Maybe Mappend -> Run.RunFsPaths -> [SerialisedKOp] - -> IO (Run (FS.Handle h)) + -> IO (Run RealWorld (FS.Handle h)) createRun hasFS hasBlockIO mMappend targetPath = Run.fromWriteBuffer hasFS hasBlockIO Run.CacheRunData (RunAllocFixed 10) targetPath . Fold.foldl insert WB.empty diff --git a/bench/micro/Bench/Database/LSMTree/Internal/WriteBuffer.hs b/bench/micro/Bench/Database/LSMTree/Internal/WriteBuffer.hs index a7d31f2a1..bcc547df0 100644 --- a/bench/micro/Bench/Database/LSMTree/Internal/WriteBuffer.hs +++ b/bench/micro/Bench/Database/LSMTree/Internal/WriteBuffer.hs @@ -1,6 +1,7 @@ module Bench.Database.LSMTree.Internal.WriteBuffer (benchmarks) where import Control.DeepSeq (NFData (..)) +import Control.Monad.Primitive import Criterion.Main (Benchmark, bench, bgroup) import qualified Criterion.Main as Cr import Data.Bifunctor (first) @@ -166,7 +167,7 @@ flush :: FS.HasFS IO FS.HandleIO -> FS.HasBlockIO IO FS.HandleIO -> RunFsPaths -> WriteBuffer - -> IO (Run (FS.Handle (FS.HandleIO))) + -> IO (Run RealWorld (FS.Handle (FS.HandleIO))) flush hfs hbio = Run.fromWriteBuffer hfs hbio Run.CacheRunData (RunAllocFixed 10) data InputKOps diff --git a/src/Database/LSMTree/Common.hs b/src/Database/LSMTree/Common.hs index ec4f4fdd0..a2f64ac19 100644 --- a/src/Database/LSMTree/Common.hs +++ b/src/Database/LSMTree/Common.hs @@ -29,6 +29,7 @@ import Control.Concurrent.Class.MonadMVar.Strict import Control.Concurrent.Class.MonadSTM (MonadSTM, STM) import Control.DeepSeq import Control.Monad.Class.MonadThrow +import Control.Monad.Primitive (PrimMonad (..)) import Data.Kind (Type) import Data.Typeable (Proxy, Typeable) import qualified Database.LSMTree.Internal as Internal @@ -216,4 +217,4 @@ listSnapshots (Session sesh) = Internal.listSnapshots sesh -- TODO: get rid of the @m@ parameter? type BlobRef :: (Type -> Type) -> Type -> Type type role BlobRef nominal nominal -data BlobRef m blob = forall h. Typeable h => BlobRef (Internal.BlobRef (Internal.Run h)) +data BlobRef m blob = forall h. Typeable h => BlobRef (Internal.BlobRef (Internal.Run (PrimState m) h)) diff --git a/src/Database/LSMTree/Internal.hs b/src/Database/LSMTree/Internal.hs index 2be544104..20f649241 100644 --- a/src/Database/LSMTree/Internal.hs +++ b/src/Database/LSMTree/Internal.hs @@ -1,3 +1,4 @@ +{-# LANGUAGE CPP #-} {-# LANGUAGE DataKinds #-} -- | TODO: this should be removed once we have proper snapshotting with proper -- persistence of the config to disk. @@ -63,8 +64,8 @@ import qualified Control.Concurrent.Class.MonadSTM.RWVar as RW import Control.DeepSeq import Control.Monad (unless, void, when) import Control.Monad.Class.MonadThrow -import Control.Monad.Primitive (PrimState, RealWorld) -import Control.Monad.ST.Strict (ST, runST) +import Control.Monad.Primitive (PrimState) +import Control.Monad.ST.Strict import Data.Arena (ArenaManager, newArenaManager) import Data.Bifunctor (Bifunctor (..)) import Data.BloomFilter (Bloom) @@ -419,7 +420,7 @@ data TableHandleEnv m h = TableHandleEnv { -- waiting for the MVar. -- -- TODO: switch to more fine-grained synchronisation approach - , tableContent :: !(RWVar m (TableContent h)) + , tableContent :: !(RWVar m (TableContent (PrimState m) h)) } {-# INLINE tableSessionRoot #-} @@ -449,18 +450,18 @@ tableSessionUntrackTable :: MonadMVar m => TableHandleEnv m h -> m () tableSessionUntrackTable thEnv = modifyMVar_ (sessionOpenTables (tableSessionEnv thEnv)) $ pure . Map.delete (tableId thEnv) -data TableContent h = TableContent { +data TableContent s h = TableContent { tableWriteBuffer :: !WriteBuffer -- | A hierarchy of levels. The vector indexes double as level numbers. - , tableLevels :: !(Levels (Handle h)) + , tableLevels :: !(Levels s (Handle h)) -- | Cache of flattened 'levels'. -- -- INVARIANT: when 'level's is modified, this cache should be updated as -- well, for example using 'mkLevelsCache'. - , tableCache :: !(LevelsCache (Handle h)) + , tableCache :: !(LevelsCache s (Handle h)) } -emptyTableContent :: TableContent h +emptyTableContent :: TableContent s h emptyTableContent = TableContent { tableWriteBuffer = WB.empty , tableLevels = V.empty @@ -481,45 +482,45 @@ withOpenTable th action = RW.withReadAccess (tableHandleState th) $ \case TableHandleClosed -> throwIO ErrTableClosed TableHandleOpen thEnv -> action thEnv -type Levels h = V.Vector (Level h) +type Levels s h = V.Vector (Level s h) -- | Runs in order from newer to older -data Level h = Level { - incomingRuns :: !(MergingRun h) - , residentRuns :: !(V.Vector (Run h)) +data Level s h = Level { + incomingRuns :: !(MergingRun s h) + , residentRuns :: !(V.Vector (Run s h)) } -- TODO: proper instance -deriving via OnlyCheckWhnfNamed "Level" (Level h) instance NoThunks (Level h) +deriving via OnlyCheckWhnfNamed "Level" (Level s h) instance NoThunks (Level s h) newtype LevelNo = LevelNo Int deriving stock Eq deriving newtype Enum -- | A merging run is either a single run, or some ongoing merge. -data MergingRun h = - MergingRun !(MergingRunState h) - | SingleRun !(Run h) +data MergingRun s h = + MergingRun !(MergingRunState s h) + | SingleRun !(Run s h) -- | Merges are stepped to completion immediately, so there is no representation -- for ongoing merges (yet) -- -- TODO: this should also represent ongoing merges once we implement scheduling. -newtype MergingRunState h = CompletedMerge (Run h) +newtype MergingRunState s h = CompletedMerge (Run s h) -- | Return all runs in the levels, ordered from newest to oldest -runsInLevels :: Levels h -> V.Vector (Run h) +runsInLevels :: Levels s h -> V.Vector (Run s h) runsInLevels levels = flip V.concatMap levels $ \(Level mr rs) -> case mr of SingleRun r -> r `V.cons` rs MergingRun (CompletedMerge r) -> r `V.cons` rs -{-# SPECIALISE closeLevels :: HasFS IO h -> HasBlockIO IO h -> Levels (Handle h) -> IO () #-} +{-# SPECIALISE closeLevels :: HasFS IO h -> HasBlockIO IO h -> Levels RealWorld (Handle h) -> IO () #-} closeLevels :: m ~ IO -- TODO: replace by @io-classes@ constraints for IO simulation. => HasFS m h -> HasBlockIO m h - -> Levels (Handle h) + -> Levels (PrimState m) (Handle h) -> m () closeLevels hfs hbio levels = V.mapM_ (Run.removeReference hfs hbio) (runsInLevels levels) @@ -532,8 +533,8 @@ closeLevels hfs hbio levels = V.mapM_ (Run.removeReference hfs hbio) (runsInLeve -- -- Use 'mkLevelsCache' to ensure that there are no mismatches between the vector -- of runs and the vectors of run components. -data LevelsCache h = LevelsCache_ { - cachedRuns :: !(V.Vector (Run h)) +data LevelsCache s h = LevelsCache_ { + cachedRuns :: !(V.Vector (Run s h)) , cachedFilters :: !(V.Vector (Bloom SerialisedKey)) , cachedIndexes :: !(V.Vector IndexCompact) , cachedKOpsFiles :: !(V.Vector h) @@ -541,7 +542,7 @@ data LevelsCache h = LevelsCache_ { -- | Flatten the argument 'Level's into a single vector of runs, and use that to -- populate the 'LevelsCache'. -mkLevelsCache :: Levels h -> LevelsCache h +mkLevelsCache :: Levels s h -> LevelsCache s h mkLevelsCache lvls = LevelsCache_ { cachedRuns = rs , cachedFilters = V.map Run.runFilter rs @@ -576,7 +577,7 @@ new sesh conf = withOpenSession sesh $ \seshEnv -> do am <- newArenaManager newWith sesh seshEnv conf am WB.empty V.empty -{-# SPECIALISE newWith :: Session IO h -> SessionEnv IO h -> TableConfig -> ArenaManager RealWorld -> WriteBuffer -> Levels (Handle h) -> IO (TableHandle IO h) #-} +{-# SPECIALISE newWith :: Session IO h -> SessionEnv IO h -> TableConfig -> ArenaManager RealWorld -> WriteBuffer -> Levels RealWorld (Handle h) -> IO (TableHandle IO h) #-} newWith :: m ~ IO -- TODO: replace by @io-classes@ constraints for IO simulation. => Session m h @@ -584,7 +585,7 @@ newWith :: -> TableConfig -> ArenaManager (PrimState m) -> WriteBuffer - -> Levels (Handle h) + -> Levels (PrimState m) (Handle h) -> m (TableHandle m h) newWith sesh seshEnv conf !am !wb !levels = do assertNoThunks levels $ pure () @@ -623,14 +624,14 @@ close th = RW.withWriteAccess_ (tableHandleState th) $ \case pure emptyTableContent pure TableHandleClosed -{-# SPECIALISE lookups :: ResolveSerialisedValue -> V.Vector SerialisedKey -> TableHandle IO h -> (Maybe (Entry SerialisedValue (BlobRef (Run (Handle h)))) -> lookupResult) -> IO (V.Vector lookupResult) #-} +{-# SPECIALISE lookups :: ResolveSerialisedValue -> V.Vector SerialisedKey -> TableHandle IO h -> (Maybe (Entry SerialisedValue (BlobRef (Run RealWorld (Handle h)))) -> lookupResult) -> IO (V.Vector lookupResult) #-} -- | See 'Database.LSMTree.Normal.lookups'. lookups :: m ~ IO -- TODO: replace by @io-classes@ constraints for IO simulation. => ResolveSerialisedValue -> V.Vector SerialisedKey -> TableHandle m h - -> (Maybe (Entry SerialisedValue (BlobRef (Run (Handle h)))) -> lookupResult) + -> (Maybe (Entry SerialisedValue (BlobRef (Run (PrimState m) (Handle h)))) -> lookupResult) -- ^ How to map from an entry to a lookup result. -> m (V.Vector lookupResult) lookups resolve ks th fromEntry = withOpenTable th $ \thEnv -> do @@ -681,7 +682,7 @@ updates resolve es th = do assertNoThunks tc' $ pure () pure tc' -{-# SPECIALISE updatesWithInterleavedFlushes :: TableConfig -> ResolveSerialisedValue -> HasFS IO h -> HasBlockIO IO h -> SessionRoot -> UniqCounter IO -> V.Vector (SerialisedKey, Entry SerialisedValue SerialisedBlob) -> TempRegistry IO -> TableContent h -> IO (TableContent h) #-} +{-# SPECIALISE updatesWithInterleavedFlushes :: TableConfig -> ResolveSerialisedValue -> HasFS IO h -> HasBlockIO IO h -> SessionRoot -> UniqCounter IO -> V.Vector (SerialisedKey, Entry SerialisedValue SerialisedBlob) -> TempRegistry IO -> TableContent RealWorld h -> IO (TableContent RealWorld h) #-} -- | A single batch of updates can fill up the write buffer multiple times. We -- flush the write buffer each time it fills up before trying to fill it up -- again. @@ -707,7 +708,7 @@ updates resolve es th = do -- and write those to disk. Of course, any remainder that did not fit into a -- whole run should then end up in a fresh write buffer. updatesWithInterleavedFlushes :: - m ~ IO -- TODO: replace by @io-classes@ constraints for IO simulation. + forall m h. m ~ IO -- TODO: replace by @io-classes@ constraints for IO simulation. => TableConfig -> ResolveSerialisedValue -> HasFS m h @@ -716,8 +717,8 @@ updatesWithInterleavedFlushes :: -> UniqCounter m -> V.Vector (SerialisedKey, Entry SerialisedValue SerialisedBlob) -> TempRegistry m - -> TableContent h - -> m (TableContent h) + -> TableContent (PrimState m) h + -> m (TableContent (PrimState m) h) updatesWithInterleavedFlushes conf resolve hfs hbio root uniqC es reg tc = do let wb = tableWriteBuffer tc (wb', es') = WB.addEntriesUpToN resolve es maxn wb @@ -742,14 +743,14 @@ updatesWithInterleavedFlushes conf resolve hfs hbio root uniqC es reg tc = do updatesWithInterleavedFlushes conf resolve hfs hbio root uniqC es' reg tc'' where AllocNumEntries (NumEntries maxn) = confWriteBufferAlloc conf - setWriteBuffer :: WriteBuffer -> TableContent h -> TableContent h + setWriteBuffer :: WriteBuffer -> TableContent (PrimState m) h -> TableContent (PrimState m) h setWriteBuffer wbToSet tc0 = TableContent { tableWriteBuffer = wbToSet , tableLevels = tableLevels tc0 , tableCache = tableCache tc0 } -{-# SPECIALISE flushWriteBuffer :: TableConfig -> ResolveSerialisedValue -> HasFS IO h -> HasBlockIO IO h -> SessionRoot -> UniqCounter IO -> TempRegistry IO -> TableContent h -> IO (TableContent h) #-} +{-# SPECIALISE flushWriteBuffer :: TableConfig -> ResolveSerialisedValue -> HasFS IO h -> HasBlockIO IO h -> SessionRoot -> UniqCounter IO -> TempRegistry IO -> TableContent RealWorld h -> IO (TableContent RealWorld h) #-} -- | Flush the write buffer to disk, regardless of whether it is full or not. -- -- The returned table content contains an updated set of levels, where the write @@ -763,8 +764,8 @@ flushWriteBuffer :: -> SessionRoot -> UniqCounter m -> TempRegistry m - -> TableContent h - -> m (TableContent h) + -> TableContent (PrimState m) h + -> m (TableContent (PrimState m) h) flushWriteBuffer conf@TableConfig{confDiskCachePolicy} resolve hfs hbio root uniqC reg tc | WB.null (tableWriteBuffer tc) = pure tc @@ -787,14 +788,14 @@ flushWriteBuffer conf@TableConfig{confDiskCachePolicy} -- | Note that the invariants rely on the fact that levelling is only used on -- the last level. -- -levelsInvariant :: forall s h. TableConfig -> Levels h -> ST s Bool -levelsInvariant conf levels = +_levelsInvariant :: forall s h. TableConfig -> Levels s h -> ST s Bool +_levelsInvariant conf levels = go (LevelNo 1) levels >>= \ !_ -> pure True where sr = confSizeRatio conf wba = confWriteBufferAlloc conf - go :: LevelNo -> Levels h -> ST s () + go :: LevelNo -> Levels s h -> ST s () go !_ (V.uncons -> Nothing) = pure () go !ln (V.uncons -> Just (Level mr rs, ls)) = do @@ -859,7 +860,7 @@ levelsInvariant conf levels = -- Check that a run is too small for next levels fitsUB policy r ln = Run.runNumEntries r <= maxRunSize sr wba policy ln -{-# SPECIALISE addRunToLevels :: TableConfig -> ResolveSerialisedValue -> HasFS IO h -> HasBlockIO IO h -> SessionRoot -> UniqCounter IO -> Run (Handle h) -> TempRegistry IO -> Levels (Handle h) -> IO (Levels (Handle h)) #-} +{-# SPECIALISE addRunToLevels :: TableConfig -> ResolveSerialisedValue -> HasFS IO h -> HasBlockIO IO h -> SessionRoot -> UniqCounter IO -> Run RealWorld (Handle h) -> TempRegistry IO -> Levels RealWorld (Handle h) -> IO (Levels RealWorld (Handle h)) #-} addRunToLevels :: forall m h. m ~ IO -- TODO: replace by @io-classes@ constraints for IO simulation. => TableConfig @@ -868,13 +869,16 @@ addRunToLevels :: -> HasBlockIO m h -> SessionRoot -> UniqCounter m - -> Run (Handle h) + -> Run (PrimState m) (Handle h) -> TempRegistry m - -> Levels (Handle h) - -> m (Levels (Handle h)) + -> Levels (PrimState m) (Handle h) + -> m (Levels (PrimState m) (Handle h)) addRunToLevels conf@TableConfig{..} resolve hfs hbio root uniqC r0 reg levels = do ls' <- go (LevelNo 1) (V.singleton r0) levels - assert (runST $ levelsInvariant conf ls') $ return ls' +#ifdef NO_IGNORE_ASSERTS + void $ stToIO $ _levelsInvariant conf ls' +#endif + return ls' where -- NOTE: @go@ is based on the @increment@ function from the -- @ScheduledMerges@ prototype. @@ -924,7 +928,7 @@ addRunToLevels conf@TableConfig{..} resolve hfs hbio root uniqC r0 reg levels = mr' <- newMerge LevelLevelling Merge.LastLevel ln (rs' `V.snoc` r) pure $! Level mr' V.empty `V.cons` V.empty - expectCompletedMerge :: MergingRun (Handle h) -> m (Run (Handle h)) + expectCompletedMerge :: MergingRun (PrimState m) (Handle h) -> m (Run (PrimState m) (Handle h)) expectCompletedMerge (SingleRun r) = pure r expectCompletedMerge (MergingRun (CompletedMerge r)) = pure r @@ -933,8 +937,8 @@ addRunToLevels conf@TableConfig{..} resolve hfs hbio root uniqC r0 reg levels = newMerge :: MergePolicyForLevel -> Merge.Level -> LevelNo - -> V.Vector (Run (Handle h)) - -> m (MergingRun (Handle h)) + -> V.Vector (Run (PrimState m) (Handle h)) + -> m (MergingRun (PrimState m) (Handle h)) newMerge mergepolicy mergelast ln rs | Just (r, rest) <- V.uncons rs , V.null rest = do @@ -951,7 +955,7 @@ addRunToLevels conf@TableConfig{..} resolve hfs hbio root uniqC r0 reg levels = data MergePolicyForLevel = LevelTiering | LevelLevelling -mergePolicyForLevel :: MergePolicy -> LevelNo -> Levels h -> MergePolicyForLevel +mergePolicyForLevel :: MergePolicy -> LevelNo -> Levels s h -> MergePolicyForLevel mergePolicyForLevel MergePolicyLazyLevelling (LevelNo n) nextLevels | n == 1 , V.null nextLevels @@ -959,7 +963,7 @@ mergePolicyForLevel MergePolicyLazyLevelling (LevelNo n) nextLevels | V.null nextLevels = LevelLevelling -- levelling on last level | otherwise = LevelTiering -runSize :: Run h -> NumEntries +runSize :: Run s h -> NumEntries runSize run = Run.runNumEntries run -- $setup @@ -1000,15 +1004,15 @@ maxRunSize' :: TableConfig -> MergePolicyForLevel -> LevelNo -> NumEntries maxRunSize' config policy ln = maxRunSize (confSizeRatio config) (confWriteBufferAlloc config) policy ln -mergeLastForLevel :: Levels s -> Merge.Level +mergeLastForLevel :: Levels s h -> Merge.Level mergeLastForLevel levels | V.null levels = Merge.LastLevel | otherwise = Merge.MidLevel -levelIsFull :: SizeRatio -> V.Vector (Run h) -> Bool +levelIsFull :: SizeRatio -> V.Vector (Run s h) -> Bool levelIsFull sr rs = V.length rs + 1 >= (sizeRatioInt sr) -{-# SPECIALISE mergeRuns :: ResolveSerialisedValue -> HasFS IO h -> HasBlockIO IO h -> SessionRoot -> UniqCounter IO -> RunDataCaching -> RunBloomFilterAlloc -> MergePolicyForLevel -> Merge.Level -> V.Vector (Run (Handle h)) -> IO (Run (Handle h)) #-} +{-# SPECIALISE mergeRuns :: ResolveSerialisedValue -> HasFS IO h -> HasBlockIO IO h -> SessionRoot -> UniqCounter IO -> RunDataCaching -> RunBloomFilterAlloc -> MergePolicyForLevel -> Merge.Level -> V.Vector (Run RealWorld (Handle h)) -> IO (Run RealWorld (Handle h)) #-} mergeRuns :: m ~ IO => ResolveSerialisedValue @@ -1020,8 +1024,8 @@ mergeRuns :: -> RunBloomFilterAlloc -> MergePolicyForLevel -> Merge.Level - -> V.Vector (Run (Handle h)) - -> m (Run (Handle h)) + -> V.Vector (Run (PrimState m) (Handle h)) + -> m (Run (PrimState m) (Handle h)) mergeRuns resolve hfs hbio root uniqC caching alloc _ mergeLevel runs = do n <- incrUniqCounter uniqC let runPaths = Paths.runPath root n @@ -1164,7 +1168,7 @@ configOverrideDiskCachePolicy pol = TableConfigOverride { confOverrideDiskCachePolicy = Last (Just pol) } -{-# SPECIALISE openLevels :: HasFS IO h -> HasBlockIO IO h -> DiskCachePolicy -> V.Vector ((Bool, RunFsPaths), V.Vector RunFsPaths) -> Managed IO (Levels (FS.Handle h)) #-} +{-# SPECIALISE openLevels :: HasFS IO h -> HasBlockIO IO h -> DiskCachePolicy -> V.Vector ((Bool, RunFsPaths), V.Vector RunFsPaths) -> Managed IO (Levels RealWorld (FS.Handle h)) #-} -- | Open multiple levels. -- -- If an error occurs when opening multiple runs in sequence, then we have to @@ -1183,7 +1187,7 @@ openLevels :: -> HasBlockIO m h -> DiskCachePolicy -> V.Vector ((Bool, RunFsPaths), V.Vector RunFsPaths) - -> Managed m (Levels (Handle h)) + -> Managed m (Levels (PrimState m) (Handle h)) openLevels hfs hbio diskCachePolicy levels = flip V.imapMStrict levels $ \i (mrPath, rsPaths) -> do let ln = LevelNo (i+1) -- level 0 is the write buffer diff --git a/src/Database/LSMTree/Internal/Lookup.hs b/src/Database/LSMTree/Internal/Lookup.hs index fcdaf7bd1..b01b211fd 100644 --- a/src/Database/LSMTree/Internal/Lookup.hs +++ b/src/Database/LSMTree/Internal/Lookup.hs @@ -217,12 +217,12 @@ data ByteCountDiscrepancy = ByteCountDiscrepancy { HasBlockIO IO h -> ArenaManager RealWorld -> ResolveSerialisedValue - -> V.Vector (Run (Handle h)) + -> V.Vector (Run RealWorld (Handle h)) -> V.Vector (Bloom SerialisedKey) -> V.Vector IndexCompact -> V.Vector (Handle h) -> V.Vector SerialisedKey - -> IO (V.Vector (Maybe (Entry SerialisedValue (BlobRef (Run (Handle h)))))) + -> IO (V.Vector (Maybe (Entry SerialisedValue (BlobRef (Run RealWorld (Handle h)))))) #-} -- | Batched lookups in I\/O. -- @@ -235,12 +235,12 @@ lookupsIO :: => HasBlockIO m h -> ArenaManager (PrimState m) -> ResolveSerialisedValue - -> V.Vector (Run (Handle h)) -- ^ Runs @rs@ + -> V.Vector (Run (PrimState m) (Handle h)) -- ^ Runs @rs@ -> V.Vector (Bloom SerialisedKey) -- ^ The bloom filters inside @rs@ -> V.Vector IndexCompact -- ^ The indexes inside @rs@ -> V.Vector (Handle h) -- ^ The file handles to the key\/value files inside @rs@ -> V.Vector SerialisedKey - -> m (V.Vector (Maybe (Entry SerialisedValue (BlobRef (Run (Handle h)))))) + -> m (V.Vector (Maybe (Entry SerialisedValue (BlobRef (Run (PrimState m) (Handle h)))))) lookupsIO !hbio !mgr !resolveV !rs !blooms !indexes !kopsFiles !ks = assert precondition $ withArena mgr $ \arena -> do (rkixs, ioops) <- Class.stToIO $ prepLookups arena blooms indexes kopsFiles ks ioress <- submitIO hbio ioops @@ -256,12 +256,12 @@ lookupsIO !hbio !mgr !resolveV !rs !blooms !indexes !kopsFiles !ks = assert prec {-# SPECIALIZE intraPageLookups :: ResolveSerialisedValue - -> V.Vector (Run (Handle h)) + -> V.Vector (Run RealWorld (Handle h)) -> V.Vector SerialisedKey -> VU.Vector (RunIx, KeyIx) -> V.Vector (IOOp RealWorld h) -> VU.Vector IOResult - -> IO (V.Vector (Maybe (Entry SerialisedValue (BlobRef (Run (Handle h)))))) + -> IO (V.Vector (Maybe (Entry SerialisedValue (BlobRef (Run RealWorld (Handle h)))))) #-} -- | Intra-page lookups. -- @@ -271,12 +271,12 @@ lookupsIO !hbio !mgr !resolveV !rs !blooms !indexes !kopsFiles !ks = assert prec intraPageLookups :: forall m h. (PrimMonad m, MonadThrow m) => ResolveSerialisedValue - -> V.Vector (Run (Handle h)) + -> V.Vector (Run (PrimState m) (Handle h)) -> V.Vector SerialisedKey -> VU.Vector (RunIx, KeyIx) -> V.Vector (IOOp (PrimState m) h) -> VU.Vector IOResult - -> m (V.Vector (Maybe (Entry SerialisedValue (BlobRef (Run (Handle h)))))) + -> m (V.Vector (Maybe (Entry SerialisedValue (BlobRef (Run (PrimState m) (Handle h)))))) intraPageLookups !resolveV !rs !ks !rkixs !ioops !ioress = do res <- VM.replicate (V.length ks) Nothing loop res 0 @@ -285,7 +285,7 @@ intraPageLookups !resolveV !rs !ks !rkixs !ioops !ioress = do !n = V.length ioops loop :: - VM.MVector (PrimState m) (Maybe (Entry SerialisedValue (BlobRef (Run (Handle h))))) + VM.MVector (PrimState m) (Maybe (Entry SerialisedValue (BlobRef (Run (PrimState m) (Handle h))))) -> Int -> m () loop !res !ioopix diff --git a/src/Database/LSMTree/Internal/Merge.hs b/src/Database/LSMTree/Internal/Merge.hs index c257bbb52..52f8d02d9 100644 --- a/src/Database/LSMTree/Internal/Merge.hs +++ b/src/Database/LSMTree/Internal/Merge.hs @@ -10,6 +10,7 @@ module Database.LSMTree.Internal.Merge ( import Control.Exception (assert) import Control.Monad (when) +import Control.Monad.Primitive (RealWorld) import Data.Coerce (coerce) import Data.Traversable (for) import Database.LSMTree.Internal.BlobRef (BlobRef (..)) @@ -33,11 +34,11 @@ import System.FS.BlockIO.API (HasBlockIO) -- -- TODO: Reference counting will have to be done somewhere, either here or in -- the layer above. -data Merge fhandle = Merge { +data Merge s fhandle = Merge { mergeLevel :: !Level , mergeMappend :: !Mappend - , mergeReaders :: {-# UNPACK #-} !(Readers.Readers fhandle) - , mergeBuilder :: !(RunBuilder fhandle) + , mergeReaders :: {-# UNPACK #-} !(Readers.Readers s fhandle) + , mergeBuilder :: !(RunBuilder s fhandle) , mergeCaching :: !RunDataCaching -- ^ The caching policy to use for the Run in the 'MergeComplete'. } @@ -57,8 +58,8 @@ new :: -> Level -> Mappend -> Run.RunFsPaths - -> [Run (FS.Handle h)] - -> IO (Maybe (Merge (FS.Handle h))) + -> [Run RealWorld (FS.Handle h)] + -> IO (Maybe (Merge RealWorld (FS.Handle h))) new fs hbio mergeCaching alloc mergeLevel mergeMappend targetPaths runs = do mreaders <- Readers.new fs hbio runs for mreaders $ \mergeReaders -> do @@ -75,15 +76,15 @@ new fs hbio mergeCaching alloc mergeLevel mergeMappend targetPaths runs = do close :: HasFS IO h -> HasBlockIO IO h - -> Merge (FS.Handle h) + -> Merge RealWorld (FS.Handle h) -> IO () close fs hbio Merge {..} = do Builder.close fs mergeBuilder Readers.close fs hbio mergeReaders -data StepResult fhandle = MergeInProgress | MergeComplete !(Run fhandle) +data StepResult s fhandle = MergeInProgress | MergeComplete !(Run s fhandle) -stepsInvariant :: Int -> (Int, StepResult a) -> Bool +stepsInvariant :: Int -> (Int, StepResult RealWorld a) -> Bool stepsInvariant requestedSteps = \case (n, MergeInProgress) -> n >= requestedSteps _ -> True @@ -102,9 +103,9 @@ stepsInvariant requestedSteps = \case steps :: HasFS IO h -> HasBlockIO IO h - -> Merge (FS.Handle h) + -> Merge RealWorld (FS.Handle h) -> Int -- ^ How many input entries to consume (at least) - -> IO (Int, StepResult (FS.Handle h)) + -> IO (Int, StepResult RealWorld (FS.Handle h)) steps fs hbio Merge {..} requestedSteps = (\res -> assert (stepsInvariant requestedSteps res) res) <$> go 0 where @@ -176,9 +177,9 @@ steps fs hbio Merge {..} requestedSteps = writeReaderEntry :: HasFS IO h -> Level - -> RunBuilder (FS.Handle h) + -> RunBuilder RealWorld (FS.Handle h) -> SerialisedKey - -> Reader.Entry (FS.Handle h) + -> Reader.Entry RealWorld (FS.Handle h) -> IO () writeReaderEntry fs level builder key (Reader.Entry entryFull) = -- Small entry. @@ -214,9 +215,9 @@ writeReaderEntry fs level builder key entry@(Reader.EntryOverflow prefix page _ writeSerialisedEntry :: HasFS IO h -> Level - -> RunBuilder (FS.Handle h) + -> RunBuilder RealWorld (FS.Handle h) -> SerialisedKey - -> Entry SerialisedValue (BlobRef (Run (FS.Handle h))) + -> Entry SerialisedValue (BlobRef (Run RealWorld (FS.Handle h))) -> IO () writeSerialisedEntry fs level builder key entry = when (shouldWriteEntry level entry) $ diff --git a/src/Database/LSMTree/Internal/Run.hs b/src/Database/LSMTree/Internal/Run.hs index 51cfcdc7c..10aaa9d99 100644 --- a/src/Database/LSMTree/Internal/Run.hs +++ b/src/Database/LSMTree/Internal/Run.hs @@ -55,15 +55,16 @@ module Database.LSMTree.Internal.Run ( , decRefCount ) where -import Control.DeepSeq (NFData (rnf)) -import Control.Exception (Exception, finally, throwIO) +import Control.DeepSeq (NFData (..), rwhnf) import Control.Monad (when) +import Control.Monad.Class.MonadThrow +import Control.Monad.Primitive import Data.BloomFilter (Bloom) import qualified Data.ByteString.Short as SBS import Data.Foldable (for_) -import Data.IORef import Data.Primitive.ByteArray (newPinnedByteArray, unsafeFreezeByteArray) +import Data.Primitive.MutVar import Database.LSMTree.Internal.BlobRef (BlobSpan (..)) import Database.LSMTree.Internal.BloomFilter (bloomFilterFromSBS) import qualified Database.LSMTree.Internal.CRC32C as CRC @@ -84,15 +85,14 @@ import System.FS.API (HasFS) import qualified System.FS.BlockIO.API as FS import System.FS.BlockIO.API (HasBlockIO) - -- | The in-memory representation of a completed LSM run. -- -data Run fhandle = Run { +data Run s fhandle = Run { runNumEntries :: !NumEntries -- | The reference count for the LSM run. This counts the -- number of references from LSM handles to this run. When -- this drops to zero the open files will be closed. - , runRefCount :: !(IORef RefCount) + , runRefCount :: !(MutVar s RefCount) -- | The file system paths for all the files used by the run. , runRunFsPaths :: !RunFsPaths -- | The bloom filter for the set of keys in this run. @@ -114,33 +114,33 @@ data Run fhandle = Run { } -- TODO: provide a proper instance that checks NoThunks for each field. -deriving via OnlyCheckWhnfNamed "Run" (Run h) instance NoThunks (Run h) +deriving via OnlyCheckWhnfNamed "Run" (Run s h) instance NoThunks (Run s h) -instance NFData fhandle => NFData (Run fhandle) where +instance NFData fhandle => NFData (Run s fhandle) where rnf (Run a b c d e f g h) = - rnf a `seq` rnf b `seq` rnf c `seq` rnf d `seq` rnf e `seq` + rnf a `seq` rwhnf b `seq` rnf c `seq` rnf d `seq` rnf e `seq` rnf f `seq` rnf g `seq` rnf h -sizeInPages :: Run fhandle -> NumPages +sizeInPages :: Run s fhandle -> NumPages sizeInPages = Index.sizeInPages . runIndex -- | Increase the reference count by one. -addReference :: HasFS IO h -> Run (FS.Handle h) -> IO () +addReference :: HasFS IO h -> Run RealWorld (FS.Handle h) -> IO () addReference _ Run {..} = - atomicModifyIORef' runRefCount (\c -> (incRefCount c, ())) + atomicModifyMutVar' runRefCount (\c -> (incRefCount c, ())) -- | Decrease the reference count by one. -- After calling this operation, the run must not be used anymore. -- If the reference count reaches zero, the run is closed, removing all its -- associated files from disk. -removeReference :: HasFS IO h -> HasBlockIO IO h -> Run (FS.Handle h) -> IO () +removeReference :: HasFS IO h -> HasBlockIO IO h -> Run RealWorld (FS.Handle h) -> IO () removeReference fs hbio run@Run {..} = do - count <- atomicModifyIORef' runRefCount ((\c -> (c, c)) . decRefCount) + count <- atomicModifyMutVar' runRefCount ((\c -> (c, c)) . decRefCount) when (count <= RefCount 0) $ close fs hbio run -- | The 'BlobSpan' to read must come from this run! -readBlob :: HasFS IO h -> Run (FS.Handle h) -> BlobSpan -> IO SerialisedBlob +readBlob :: HasFS IO h -> Run RealWorld (FS.Handle h) -> BlobSpan -> IO SerialisedBlob readBlob fs Run {..} BlobSpan {..} = do let off = fromIntegral blobSpanOffset let len = fromIntegral blobSpanSize @@ -155,7 +155,7 @@ readBlob fs Run {..} BlobSpan {..} = do -- -- TODO: Once snapshots are implemented, files should get removed, but for now -- we want to be able to re-open closed runs from disk. -close :: HasFS IO h -> HasBlockIO IO h -> Run (FS.Handle h) -> IO () +close :: HasFS IO h -> HasBlockIO IO h -> Run RealWorld (FS.Handle h) -> IO () close fs hbio Run {..} = do -- TODO: removing files should drop them from the page cache, but until we -- have proper snapshotting we are keeping the files around. Because of @@ -191,12 +191,12 @@ fromMutable :: HasFS IO h -> HasBlockIO IO h -> RunDataCaching -> RefCount - -> RunBuilder (FS.Handle h) - -> IO (Run (FS.Handle h)) + -> RunBuilder RealWorld (FS.Handle h) + -> IO (Run RealWorld (FS.Handle h)) fromMutable fs hbio runRunDataCaching refCount builder = do (runRunFsPaths, runFilter, runIndex, runNumEntries) <- Builder.unsafeFinalise fs hbio (runRunDataCaching == NoCacheRunData) builder - runRefCount <- newIORef refCount + runRefCount <- newMutVar refCount runKOpsFile <- FS.hOpen fs (runKOpsPath runRunFsPaths) FS.ReadMode runBlobFile <- FS.hOpen fs (runBlobPath runRunFsPaths) FS.ReadMode setRunDataCaching hbio runKOpsFile runRunDataCaching @@ -215,7 +215,7 @@ fromWriteBuffer :: HasFS IO h -> RunBloomFilterAlloc -> RunFsPaths -> WriteBuffer - -> IO (Run (FS.Handle h)) + -> IO (Run RealWorld (FS.Handle h)) fromWriteBuffer fs hbio caching alloc fsPaths buffer = do builder <- Builder.new fs fsPaths (WB.numEntries buffer) alloc for_ (WB.toList buffer) $ \(k, e) -> @@ -240,7 +240,7 @@ openFromDisk :: HasFS IO h -> HasBlockIO IO h -> RunDataCaching -> RunFsPaths - -> IO (Run (FS.Handle h)) + -> IO (Run RealWorld (FS.Handle h)) openFromDisk fs hbio runRunDataCaching runRunFsPaths = do expectedChecksums <- expectValidFile (runChecksumsPath runRunFsPaths) . fromChecksumsFile @@ -259,7 +259,7 @@ openFromDisk fs hbio runRunDataCaching runRunFsPaths = do expectValidFile (forRunIndex paths) . Index.fromSBS =<< readCRC (forRunIndex expectedChecksums) (forRunIndex paths) - runRefCount <- newIORef (RefCount 1) + runRefCount <- newMutVar (RefCount 1) runKOpsFile <- FS.hOpen fs (runKOpsPath runRunFsPaths) FS.ReadMode runBlobFile <- FS.hOpen fs (runBlobPath runRunFsPaths) FS.ReadMode setRunDataCaching hbio runKOpsFile runRunDataCaching diff --git a/src/Database/LSMTree/Internal/RunBuilder.hs b/src/Database/LSMTree/Internal/RunBuilder.hs index ad18c14f1..9a56466e3 100644 --- a/src/Database/LSMTree/Internal/RunBuilder.hs +++ b/src/Database/LSMTree/Internal/RunBuilder.hs @@ -10,11 +10,12 @@ module Database.LSMTree.Internal.RunBuilder ( ) where import Control.Monad (when) +import Control.Monad.Primitive import qualified Control.Monad.ST as ST import Data.BloomFilter (Bloom) import qualified Data.ByteString.Lazy as BSL import Data.Foldable (for_, traverse_) -import Data.IORef +import Data.Primitive import Data.Traversable (for) import Data.Word (Word64) import Database.LSMTree.Internal.BlobRef (BlobSpan (..)) @@ -49,7 +50,7 @@ import System.FS.BlockIO.API (HasBlockIO) -- -- __Not suitable for concurrent construction from multiple threads!__ -- -data RunBuilder fhandle = RunBuilder { +data RunBuilder s fhandle = RunBuilder { -- | The file system paths for all the files used by the run. runBuilderFsPaths :: !RunFsPaths @@ -57,13 +58,13 @@ data RunBuilder fhandle = RunBuilder { -- morally pure subset of the run cnstruction functionality. In -- particular it contains the (mutable) index, bloom filter and buffered -- pending output for the key\/ops file. - , runBuilderAcc :: !(RunAcc ST.RealWorld) + , runBuilderAcc :: !(RunAcc s) -- | The byte offset within the blob file for the next blob to be written. - , runBuilderBlobOffset :: !(IORef Word64) + , runBuilderBlobOffset :: !(MutVar s Word64) -- | The (write mode) file handles. - , runBuilderHandles :: {-# UNPACK #-} !(ForRunFiles (ChecksumHandle fhandle)) + , runBuilderHandles :: {-# UNPACK #-} !(ForRunFiles (ChecksumHandle s fhandle)) } -- | Create an 'RunBuilder' to start building a run. @@ -75,10 +76,10 @@ new :: -> RunFsPaths -> NumEntries -- ^ an upper bound of the number of entries to be added -> RunBloomFilterAlloc - -> IO (RunBuilder (FS.Handle h)) + -> IO (RunBuilder RealWorld (FS.Handle h)) new fs runBuilderFsPaths numEntries alloc = do runBuilderAcc <- ST.stToIO $ RunAcc.new numEntries alloc - runBuilderBlobOffset <- newIORef 0 + runBuilderBlobOffset <- newMutVar 0 runBuilderHandles <- traverse (makeHandle fs) (pathsForRunFiles runBuilderFsPaths) @@ -96,7 +97,7 @@ new fs runBuilderFsPaths numEntries alloc = do -- addKeyOp :: HasFS IO h - -> RunBuilder (FS.Handle h) + -> RunBuilder RealWorld (FS.Handle h) -> SerialisedKey -> Entry SerialisedValue SerialisedBlob -> IO () @@ -123,7 +124,7 @@ addKeyOp fs builder@RunBuilder{runBuilderAcc} key op = do -- addLargeSerialisedKeyOp :: HasFS IO h - -> RunBuilder (FS.Handle h) + -> RunBuilder RealWorld (FS.Handle h) -> SerialisedKey -> RawPage -> [RawOverflowPage] @@ -146,7 +147,7 @@ unsafeFinalise :: HasFS IO h -> HasBlockIO IO h -> Bool -- ^ drop caches - -> RunBuilder (FS.Handle h) + -> RunBuilder RealWorld (FS.Handle h) -> IO (RunFsPaths, Bloom SerialisedKey, IndexCompact, NumEntries) unsafeFinalise fs hbio dropCaches builder@RunBuilder {..} = do -- write final bits @@ -177,7 +178,7 @@ unsafeFinalise fs hbio dropCaches builder@RunBuilder {..} = do -- After calling this operation, the run must not be used anymore. -- -- TODO: Ensure proper cleanup even in presence of exceptions. -close :: HasFS IO h -> RunBuilder (FS.Handle h) -> IO () +close :: HasFS IO h -> RunBuilder RealWorld (FS.Handle h) -> IO () close fs RunBuilder {..} = do traverse_ (closeHandle fs) runBuilderHandles traverse_ (FS.removeFile fs) (pathsForRunFiles runBuilderFsPaths) @@ -186,46 +187,46 @@ close fs RunBuilder {..} = do Helpers -------------------------------------------------------------------------------} -writeRawPage :: HasFS IO h -> RunBuilder (FS.Handle h) -> RawPage -> IO () +writeRawPage :: HasFS IO h -> RunBuilder RealWorld (FS.Handle h) -> RawPage -> IO () writeRawPage fs RunBuilder {..} = writeToHandle fs (forRunKOps runBuilderHandles) . BSL.fromStrict . RB.unsafePinnedToByteString -- 'RawPage' is guaranteed to be pinned . RawPage.rawPageRawBytes -writeRawOverflowPages :: HasFS IO h -> RunBuilder (FS.Handle h) -> [RawOverflowPage] -> IO () +writeRawOverflowPages :: HasFS IO h -> RunBuilder RealWorld (FS.Handle h) -> [RawOverflowPage] -> IO () writeRawOverflowPages fs RunBuilder {..} = writeToHandle fs (forRunKOps runBuilderHandles) . BSL.fromChunks . map (RawOverflowPage.rawOverflowPageToByteString) -writeBlob :: HasFS IO h -> RunBuilder (FS.Handle h) -> SerialisedBlob -> IO BlobSpan +writeBlob :: HasFS IO h -> RunBuilder RealWorld (FS.Handle h) -> SerialisedBlob -> IO BlobSpan writeBlob fs RunBuilder{..} blob = do let size = sizeofBlob64 blob - offset <- readIORef runBuilderBlobOffset - modifyIORef' runBuilderBlobOffset (+size) + offset <- readMutVar runBuilderBlobOffset + modifyMutVar' runBuilderBlobOffset (+size) let SerialisedBlob rb = blob let lbs = BSL.fromStrict $ RB.toByteString rb writeToHandle fs (forRunBlob runBuilderHandles) lbs return (BlobSpan offset (fromIntegral size)) -writeFilter :: HasFS IO h -> RunBuilder (FS.Handle h) -> Bloom SerialisedKey -> IO () +writeFilter :: HasFS IO h -> RunBuilder RealWorld (FS.Handle h) -> Bloom SerialisedKey -> IO () writeFilter fs RunBuilder {..} bf = writeToHandle fs (forRunFilter runBuilderHandles) (bloomFilterToLBS bf) -writeIndexHeader :: HasFS IO h -> RunBuilder (FS.Handle h) -> IO () +writeIndexHeader :: HasFS IO h -> RunBuilder RealWorld (FS.Handle h) -> IO () writeIndexHeader fs RunBuilder {..} = writeToHandle fs (forRunIndex runBuilderHandles) $ Index.headerLBS -writeIndexChunk :: HasFS IO h -> RunBuilder (FS.Handle h) -> Index.Chunk -> IO () +writeIndexChunk :: HasFS IO h -> RunBuilder RealWorld (FS.Handle h) -> Index.Chunk -> IO () writeIndexChunk fs RunBuilder {..} chunk = writeToHandle fs (forRunIndex runBuilderHandles) $ BSL.fromStrict $ Index.chunkToBS chunk writeIndexFinal :: HasFS IO h - -> RunBuilder (FS.Handle h) + -> RunBuilder RealWorld (FS.Handle h) -> NumEntries -> IndexCompact -> IO () @@ -238,25 +239,25 @@ writeIndexFinal fs RunBuilder {..} numEntries index = -------------------------------------------------------------------------------} -- | Tracks the checksum of a (write mode) file handle. -data ChecksumHandle fhandle = ChecksumHandle !fhandle !(IORef CRC32C) +data ChecksumHandle s fhandle = ChecksumHandle !fhandle !(MutVar s CRC32C) -makeHandle :: HasFS IO h -> FS.FsPath -> IO (ChecksumHandle (FS.Handle h)) +makeHandle :: HasFS IO h -> FS.FsPath -> IO (ChecksumHandle RealWorld (FS.Handle h)) makeHandle fs path = ChecksumHandle <$> FS.hOpen fs path (FS.WriteMode FS.MustBeNew) - <*> newIORef CRC.initialCRC32C + <*> newMutVar CRC.initialCRC32C -readChecksum :: ChecksumHandle (FS.Handle h) -> IO CRC32C -readChecksum (ChecksumHandle _h checksum) = readIORef checksum +readChecksum :: ChecksumHandle RealWorld (FS.Handle h) -> IO CRC32C +readChecksum (ChecksumHandle _h checksum) = readMutVar checksum -dropCache :: HasBlockIO IO h -> ChecksumHandle (FS.Handle h) -> IO () +dropCache :: HasBlockIO IO h -> ChecksumHandle RealWorld (FS.Handle h) -> IO () dropCache hbio (ChecksumHandle h _) = FS.hDropCacheAll hbio h -closeHandle :: HasFS IO h -> ChecksumHandle (FS.Handle h) -> IO () +closeHandle :: HasFS IO h -> ChecksumHandle RealWorld (FS.Handle h) -> IO () closeHandle fs (ChecksumHandle h _checksum) = FS.hClose fs h -writeToHandle :: HasFS IO h -> ChecksumHandle (FS.Handle h) -> BSL.ByteString -> IO () +writeToHandle :: HasFS IO h -> ChecksumHandle RealWorld (FS.Handle h) -> BSL.ByteString -> IO () writeToHandle fs (ChecksumHandle h checksum) lbs = do - crc <- readIORef checksum + crc <- readMutVar checksum (_, crc') <- CRC.hPutAllChunksCRC32C fs h lbs crc - writeIORef checksum crc' + writeMutVar checksum crc' diff --git a/src/Database/LSMTree/Internal/RunReader.hs b/src/Database/LSMTree/Internal/RunReader.hs index 47f25500a..392c8ecd4 100644 --- a/src/Database/LSMTree/Internal/RunReader.hs +++ b/src/Database/LSMTree/Internal/RunReader.hs @@ -48,24 +48,24 @@ import System.FS.BlockIO.API (HasBlockIO) -- -- TODO(optimise): Reuse page buffers using some kind of allocator. However, -- deciding how long a page needs to stay around is not trivial. -data RunReader fhandle = RunReader { +data RunReader s fhandle = RunReader { readerCurrentPage :: !(IORef RawPage) -- | The index of the entry to be returned by the next call to 'next'. - , readerCurrentEntryNo :: !(PrimVar RealWorld Word16) + , readerCurrentEntryNo :: !(PrimVar s Word16) -- | Read mode file handle into the run's k\/ops file. We rely on it to -- track the position of the next disk page to read, instead of keeping -- a counter ourselves. Also, the run's handle is supposed to be opened -- with @O_DIRECT@, which is counterproductive here. , readerKOpsHandle :: !fhandle -- | The run this reader is reading from. - , readerRun :: !(Run.Run fhandle) + , readerRun :: !(Run.Run s fhandle) } new :: HasFS IO h -> HasBlockIO IO h - -> Run.Run (FS.Handle h) - -> IO (RunReader (FS.Handle h)) + -> Run.Run RealWorld (FS.Handle h) + -> IO (RunReader RealWorld (FS.Handle h)) new fs hbio readerRun = do readerKOpsHandle <- FS.hOpen fs (runKOpsPath (Run.runRunFsPaths readerRun)) FS.ReadMode @@ -83,7 +83,7 @@ new fs hbio readerRun = do close :: HasFS IO h -> HasBlockIO IO h - -> RunReader (FS.Handle h) + -> RunReader s (FS.Handle h) -> IO () close fs hbio RunReader {..} = do when (Run.runRunDataCaching readerRun == Run.NoCacheRunData) $ @@ -94,18 +94,18 @@ close fs hbio RunReader {..} = do -- | The 'SerialisedKey' and 'SerialisedValue' point into the in-memory disk -- page. Keeping them alive will also prevent garbage collection of the 4k byte -- array, so if they're long-lived, consider making a copy! -data Result fhandle +data Result s fhandle = Empty - | ReadEntry !SerialisedKey !(Entry fhandle) + | ReadEntry !SerialisedKey !(Entry s fhandle) -data Entry fhandle = +data Entry s fhandle = Entry - !(E.Entry SerialisedValue (BlobRef (Run fhandle))) + !(E.Entry SerialisedValue (BlobRef (Run s fhandle))) | -- | A large entry. The caller might be interested in various different -- (redundant) representation, so we return all of them. EntryOverflow -- | The value is just a prefix, with the remainder in the overflow pages. - !(E.Entry SerialisedValue (BlobRef (Run fhandle))) + !(E.Entry SerialisedValue (BlobRef (Run s fhandle))) -- | A page containing the single entry (or rather its prefix). !RawPage -- | Non-zero length of the overflow in bytes. @@ -119,11 +119,11 @@ data Entry fhandle = ![RawOverflowPage] mkEntryOverflow :: - E.Entry SerialisedValue (BlobRef (Run fhandle)) + E.Entry SerialisedValue (BlobRef (Run s fhandle)) -> RawPage -> Word32 -> [RawOverflowPage] - -> Entry fhandle + -> Entry s fhandle mkEntryOverflow entryPrefix page len overflowPages = assert (len > 0) $ assert (rawPageOverflowPages page == ceilDivPageSize (fromIntegral len)) $ @@ -131,7 +131,7 @@ mkEntryOverflow entryPrefix page len overflowPages = EntryOverflow entryPrefix page len overflowPages {-# INLINE toFullEntry #-} -toFullEntry :: Entry fhandle -> E.Entry SerialisedValue (BlobRef (Run fhandle)) +toFullEntry :: Entry s fhandle -> E.Entry SerialisedValue (BlobRef (Run s fhandle)) toFullEntry = \case Entry e -> e @@ -150,8 +150,8 @@ appendOverflow len overflowPages (SerialisedValue prefix) = next :: HasFS IO h -> HasBlockIO IO h - -> RunReader (FS.Handle h) - -> IO (Result (FS.Handle h)) + -> RunReader RealWorld (FS.Handle h) + -> IO (Result RealWorld (FS.Handle h)) next fs hbio reader@RunReader {..} = do entryNo <- readPrimVar readerCurrentEntryNo page <- readIORef readerCurrentPage diff --git a/src/Database/LSMTree/Internal/RunReaders.hs b/src/Database/LSMTree/Internal/RunReaders.hs index ac66d4464..1fcf78cdc 100644 --- a/src/Database/LSMTree/Internal/RunReaders.hs +++ b/src/Database/LSMTree/Internal/RunReaders.hs @@ -9,10 +9,10 @@ module Database.LSMTree.Internal.RunReaders ( ) where import Control.Monad (zipWithM) -import Control.Monad.Primitive (RealWorld) +import Control.Monad.Primitive import Data.Function (on) -import Data.IORef import Data.Maybe (catMaybes) +import Data.Primitive.MutVar import Data.Traversable (for) import Database.LSMTree.Internal.Run (Run) import Database.LSMTree.Internal.RunReader (RunReader) @@ -33,14 +33,14 @@ import System.FS.BlockIO.API (HasBlockIO) -- -- Creating a 'RunReaders' does not increase the runs' reference count, so make -- sure they remain open while using the 'RunReaders'. -data Readers fhandle = Readers { - readersHeap :: !(Heap.MutableHeap RealWorld (ReadCtx fhandle)) +data Readers s fhandle = Readers { + readersHeap :: !(Heap.MutableHeap s (ReadCtx fhandle)) -- | Since there is always one reader outside of the heap, we need to -- store it separately. This also contains the next k\/op to yield, unless -- all readers are drained, i.e. both: -- 1. the reader inside the 'ReadCtx' is empty -- 2. the heap is empty - , readersNext :: !(IORef (ReadCtx fhandle)) + , readersNext :: !(MutVar s (ReadCtx fhandle)) } newtype ReaderNumber = ReaderNumber Int @@ -58,14 +58,14 @@ data ReadCtx fhandle = ReadCtx { -- Using an 'STRef' could avoid reallocating the record for every entry, -- but that might not be straightforward to integrate with the heap. readCtxHeadKey :: !SerialisedKey - , readCtxHeadEntry :: !(Reader.Entry fhandle) + , readCtxHeadEntry :: !(Reader.Entry RealWorld fhandle) -- We could get rid of this by making 'LoserTree' stable (for which there -- is a prototype already). -- Alternatively, if we decide to have an invariant that the number in -- 'RunFsPaths' is always higher for newer runs, then we could use that -- in the 'Ord' instance. , readCtxNumber :: !ReaderNumber - , readCtxReader :: !(RunReader fhandle) + , readCtxReader :: !(RunReader RealWorld fhandle) } instance Eq (ReadCtx fhandle) where @@ -80,13 +80,13 @@ instance Ord (ReadCtx fhandle) where new :: HasFS IO h -> HasBlockIO IO h - -> [Run (FS.Handle h)] - -> IO (Maybe (Readers (FS.Handle h))) + -> [Run RealWorld (FS.Handle h)] + -> IO (Maybe (Readers RealWorld (FS.Handle h))) new fs hbio runs = do readers <- zipWithM (fromRun . ReaderNumber) [1..] runs (readersHeap, firstReadCtx) <- Heap.newMutableHeap (catMaybes readers) for firstReadCtx $ \readCtx -> do - readersNext <- newIORef readCtx + readersNext <- newMutVar readCtx return Readers {..} where fromRun n run = nextReadCtx fs hbio n =<< Reader.new fs hbio run @@ -95,10 +95,10 @@ new fs hbio runs = do close :: HasFS IO h -> HasBlockIO IO h - -> Readers (FS.Handle h) + -> Readers RealWorld (FS.Handle h) -> IO () close fs hbio Readers {..} = do - ReadCtx {readCtxReader} <- readIORef readersNext + ReadCtx {readCtxReader} <- readMutVar readersNext Reader.close fs hbio readCtxReader closeHeap where @@ -110,10 +110,10 @@ close fs hbio Readers {..} = do closeHeap peekKey :: - Readers (FS.Handle h) + Readers RealWorld (FS.Handle h) -> IO SerialisedKey peekKey Readers {..} = do - readCtxHeadKey <$> readIORef readersNext + readCtxHeadKey <$> readMutVar readersNext -- | Once a function returned 'Drained', do not use the 'Readers' any more! data HasMore = HasMore | Drained @@ -122,21 +122,21 @@ data HasMore = HasMore | Drained pop :: HasFS IO h -> HasBlockIO IO h - -> Readers (FS.Handle h) - -> IO (SerialisedKey, Reader.Entry (FS.Handle h), HasMore) + -> Readers RealWorld (FS.Handle h) + -> IO (SerialisedKey, Reader.Entry RealWorld (FS.Handle h), HasMore) pop fs hbio r@Readers {..} = do - ReadCtx {..} <- readIORef readersNext + ReadCtx {..} <- readMutVar readersNext hasMore <- dropOne fs hbio r readCtxNumber readCtxReader return (readCtxHeadKey, readCtxHeadEntry, hasMore) dropWhileKey :: HasFS IO h -> HasBlockIO IO h - -> Readers (FS.Handle h) + -> Readers RealWorld (FS.Handle h) -> SerialisedKey -> IO (Int, HasMore) -- ^ How many were dropped? dropWhileKey fs hbio Readers {..} key = do - cur <- readIORef readersNext + cur <- readMutVar readersNext if readCtxHeadKey cur == key then go 0 cur else return (0, HasMore) -- nothing to do @@ -156,16 +156,16 @@ dropWhileKey fs hbio Readers {..} key = do then go n' next else do - writeIORef readersNext next + writeMutVar readersNext next return (n', HasMore) dropOne :: HasFS IO h -> HasBlockIO IO h - -> Readers (FS.Handle h) + -> Readers RealWorld (FS.Handle h) -> ReaderNumber - -> RunReader (FS.Handle h) + -> RunReader RealWorld (FS.Handle h) -> IO HasMore dropOne fs hbio Readers {..} number reader = do mNext <- nextReadCtx fs hbio number reader >>= \case @@ -175,14 +175,14 @@ dropOne fs hbio Readers {..} number reader = do Nothing -> return Drained Just next -> do - writeIORef readersNext next + writeMutVar readersNext next return HasMore nextReadCtx :: HasFS IO h -> HasBlockIO IO h -> ReaderNumber - -> RunReader (FS.Handle h) + -> RunReader RealWorld (FS.Handle h) -> IO (Maybe (ReadCtx (FS.Handle h))) nextReadCtx fs hbio readCtxNumber readCtxReader = do res <- Reader.next fs hbio readCtxReader diff --git a/test/Test/Database/LSMTree/Internal/Merge.hs b/test/Test/Database/LSMTree/Internal/Merge.hs index 47ad8d978..b73200508 100644 --- a/test/Test/Database/LSMTree/Internal/Merge.hs +++ b/test/Test/Database/LSMTree/Internal/Merge.hs @@ -2,6 +2,7 @@ module Test.Database.LSMTree.Internal.Merge (tests) where +import Control.Monad.Primitive import Data.Bifoldable (bifoldMap) import qualified Data.BloomFilter as Bloom import Data.Foldable (traverse_) @@ -31,6 +32,7 @@ import Test.QuickCheck import Test.Tasty import Test.Tasty.QuickCheck + tests :: TestTree tests = testGroup "Test.Database.LSMTree.Internal.Merge" [ testProperty "prop_MergeDistributes" $ \level stepSize wbs -> @@ -164,9 +166,9 @@ mergeRuns :: FS.HasBlockIO IO h -> Merge.Level -> Word64 -> - [Run.Run (FS.Handle h)] -> + [Run.Run RealWorld (FS.Handle h)] -> StepSize -> - IO (Int, Run.Run (FS.Handle h)) + IO (Int, Run.Run RealWorld (FS.Handle h)) mergeRuns fs hbio level runNumber runs (Positive stepSize) = do Merge.new fs hbio Run.CacheRunData (RunAllocFixed 10) level mappendValues (RunFsPaths (FS.mkFsPath []) runNumber) runs >>= \case diff --git a/test/Test/Database/LSMTree/Internal/Run.hs b/test/Test/Database/LSMTree/Internal/Run.hs index 1e75a8491..41b0702f8 100644 --- a/test/Test/Database/LSMTree/Internal/Run.hs +++ b/test/Test/Database/LSMTree/Internal/Run.hs @@ -10,15 +10,16 @@ module Test.Database.LSMTree.Internal.Run ( isLargeKOp, ) where +import Control.Monad.Primitive import Data.Bifoldable (bifoldMap, bisum) import Data.Bifunctor (bimap) import Data.ByteString (ByteString) import qualified Data.ByteString as BS import qualified Data.ByteString.Short as SBS import Data.Coerce (coerce) -import Data.IORef (readIORef) import qualified Data.Map.Strict as Map import Data.Maybe (fromJust) +import Data.Primitive import qualified Data.Primitive.ByteArray as BA import System.FilePath import qualified System.FS.API as FS @@ -51,6 +52,7 @@ import qualified Database.LSMTree.Internal.WriteBuffer as WB import qualified FormatPage as Proto + tests :: TestTree tests = testGroup "Database.LSMTree.Internal.Run" [ testGroup "Write buffer to disk" @@ -226,8 +228,8 @@ prop_WriteAndOpen fs hbio (TypedWriteBuffer wb) = do written <- fromWriteBuffer fs hbio CacheRunData (RunAllocFixed 10) fsPaths wb loaded <- openFromDisk fs hbio CacheRunData fsPaths - (RefCount 1 @=?) =<< readIORef (runRefCount written) - (RefCount 1 @=?) =<< readIORef (runRefCount loaded) + (RefCount 1 @=?) =<< readMutVar (runRefCount written) + (RefCount 1 @=?) =<< readMutVar (runRefCount loaded) runNumEntries written @=? runNumEntries loaded runFilter written @=? runFilter loaded @@ -258,7 +260,7 @@ isLargeKOp (key, entry) = size > pageSize pageSize = 4096 size = sizeofKey key + bisum (bimap sizeofValue sizeofBlob entry) -readKOps :: FS.HasFS IO h -> FS.HasBlockIO IO h -> Run (FS.Handle h) -> IO [SerialisedKOp] +readKOps :: FS.HasFS IO h -> FS.HasBlockIO IO h -> Run RealWorld (FS.Handle h) -> IO [SerialisedKOp] readKOps fs hbio run = do reader <- Reader.new fs hbio run go reader diff --git a/test/Test/Database/LSMTree/Internal/RunReaders.hs b/test/Test/Database/LSMTree/Internal/RunReaders.hs index 7769c8328..32393d9a3 100644 --- a/test/Test/Database/LSMTree/Internal/RunReaders.hs +++ b/test/Test/Database/LSMTree/Internal/RunReaders.hs @@ -37,6 +37,7 @@ import Test.Tasty (TestTree, testGroup) import Test.Tasty.QuickCheck import Test.Util.Orphans () +import Control.Monad.Primitive (RealWorld) import Test.QuickCheck.StateModel import Test.QuickCheck.StateModel.Lockstep import qualified Test.QuickCheck.StateModel.Lockstep.Defaults as Lockstep @@ -281,7 +282,7 @@ data RealState = !(Maybe ReadersCtx) -- | Readers, together with the runs being read, so they can be cleaned up at the end -type ReadersCtx = ([Run.Run Handle], Readers Handle) +type ReadersCtx = ([Run.Run RealWorld Handle], Readers RealWorld Handle) closeReadersCtx :: FS.HasFS IO MockFS.HandleMock -> FS.HasBlockIO IO MockFS.HandleMock -> ReadersCtx -> IO () closeReadersCtx hfs hbio (runs, readers) = do @@ -337,7 +338,7 @@ runIO act lu = case act of return (hasMore, (key, fullEntry, hasMore)) expectReaders :: - (FS.HasFS IO MockFS.HandleMock -> FS.HasBlockIO IO MockFS.HandleMock -> Readers Handle -> IO (HasMore, a)) + (FS.HasFS IO MockFS.HandleMock -> FS.HasBlockIO IO MockFS.HandleMock -> Readers RealWorld Handle -> IO (HasMore, a)) -> RealMonad (Either () a) expectReaders f = ReaderT $ \(hfs, hbio) -> do @@ -354,9 +355,9 @@ runIO act lu = case act of put (RealState n Nothing) return (Right x) - toMockEntry :: FS.HasFS IO MockFS.HandleMock -> Reader.Entry Handle -> IO SerialisedEntry + toMockEntry :: FS.HasFS IO MockFS.HandleMock -> Reader.Entry RealWorld Handle -> IO SerialisedEntry toMockEntry hfs = traverse loadBlob . Reader.toFullEntry where - loadBlob :: BlobRef (Run.Run Handle) -> IO SerialisedBlob + loadBlob :: BlobRef (Run.Run RealWorld Handle) -> IO SerialisedBlob loadBlob (BlobRef run sp) = Run.readBlob hfs run sp