Skip to content

Commit

Permalink
WIP code-review
Browse files Browse the repository at this point in the history
  • Loading branch information
jasagredo committed Nov 14, 2024
1 parent 0ce88f5 commit 6cbea64
Show file tree
Hide file tree
Showing 17 changed files with 165 additions and 244 deletions.
3 changes: 2 additions & 1 deletion ouroboros-consensus/ouroboros-consensus.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,6 @@ library
Ouroboros.Consensus.Storage.LedgerDB.V2.Common
Ouroboros.Consensus.Storage.LedgerDB.V2.InMemory
Ouroboros.Consensus.Storage.LedgerDB.V2.Init
Ouroboros.Consensus.Storage.LedgerDB.V2.LSM
Ouroboros.Consensus.Storage.LedgerDB.V2.LedgerSeq
Ouroboros.Consensus.Storage.Serialisation
Ouroboros.Consensus.Storage.VolatileDB
Expand Down Expand Up @@ -705,6 +704,7 @@ test-suite storage-test
Test.Ouroboros.Storage.LedgerDB.V1.BackingStore.Registry
Test.Ouroboros.Storage.LedgerDB.V1.DbChangelog.QuickCheck
Test.Ouroboros.Storage.LedgerDB.V1.DbChangelog.Unit
Test.Ouroboros.Storage.LedgerDB.V1.LMDB
Test.Ouroboros.Storage.Orphans
Test.Ouroboros.Storage.TestBlock
Test.Ouroboros.Storage.VolatileDB
Expand Down Expand Up @@ -741,6 +741,7 @@ test-suite storage-test
ouroboros-consensus,
ouroboros-network-api,
ouroboros-network-mock,
ouroboros-network-testing,
pretty-show,
quickcheck-dynamic,
quickcheck-lockstep,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,6 @@ closeDB ::
)
=> ChainDbHandle m blk -> m ()
closeDB (CDBHandle varState) = do
traceMarkerIO "Closing ChainDB"
mbOpenEnv <- atomically $ readTVar varState >>= \case
-- Idempotent
ChainDbClosed -> return Nothing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ module Ouroboros.Consensus.Storage.LedgerDB.Impl.Snapshots (
-- * Testing
, decodeLBackwardsCompatible
, encodeL
, destroySnapshots
) where

import Codec.CBOR.Decoding
Expand Down Expand Up @@ -146,6 +147,17 @@ deleteSnapshot (SomeHasFS HasFS{doesDirectoryExist, removeDirectoryRecursive}) s
exists <- doesDirectoryExist p
when exists (removeDirectoryRecursive p)

-- | Testing only! Destroy all snapshots in the DB.
destroySnapshots :: Monad m => SomeHasFS m -> m ()
destroySnapshots (SomeHasFS fs) = do
dirs <- Set.lookupMax . Set.filter (isJust . snapshotFromPath) <$> listDirectory fs (mkFsPath [])
mapM_ ((\d -> do
isDir <- doesDirectoryExist fs d
if isDir
then removeDirectoryRecursive fs d
else removeFile fs d
) . mkFsPath . (:[])) dirs

-- | Read an extended ledger state from disk
readExtLedgerState ::
forall m blk. IOLike m
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -333,9 +333,6 @@ reapplyBlock cfg b ksReader db =

-- | Apply a block on top of the ledger state and extend the DbChangelog with
-- the result ledger state.
--
-- Note that we require @c@ (from the particular choice of @Ap m l blk c@) so
-- this sometimes can throw ledger errors.
reapplyThenPush :: (Monad m, ApplyBlock l blk)
=> LedgerDbCfg l
-> blk
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -326,22 +326,11 @@ mkInternals ::
mkInternals h = TestInternals {
takeSnapshotNOW = getEnv1 h implIntTakeSnapshot
, reapplyThenPushNOW = getEnv1 h implIntReapplyThenPushBlock
, wipeLedgerDB = getEnv h $ void . destroySnapshots . ldbHasFS
, wipeLedgerDB = getEnv h $ void . destroySnapshots . snapshotsFs . ldbHasFS
, closeLedgerDB = getEnv h $ bsClose . ldbBackingStore
, truncateSnapshots = getEnv h $ void . implIntTruncateSnapshots . ldbHasFS
}

-- | Testing only! Destroy all snapshots in the DB.
destroySnapshots :: Monad m => SnapshotsFS m -> m ()
destroySnapshots (SnapshotsFS (SomeHasFS fs)) = do
dirs <- Set.lookupMax . Set.filter (isJust . snapshotFromPath) <$> listDirectory fs (mkFsPath [])
mapM_ ((\d -> do
isDir <- doesDirectoryExist fs d
if isDir
then removeDirectoryRecursive fs d
else removeFile fs d
) . mkFsPath . (:[])) dirs

-- | Testing only! Truncate all snapshots in the DB.
implIntTruncateSnapshots :: MonadThrow m => SnapshotsFS m -> m ()
implIntTruncateSnapshots (SnapshotsFS (SomeHasFS fs)) = do
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@ module Ouroboros.Consensus.Storage.LedgerDB.V2.Args (

import GHC.Generics
import NoThunks.Class
import Data.Void (Void)

data LedgerDbFlavorArgs f m = V2Args HandleArgs

data HandleArgs =
InMemoryHandleArgs
-- TODO
-- | LSMHandleArgs
| LSMHandleArgs !Void
deriving (Generic, NoThunks)

data FlavorImplSpecificTrace =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,15 +203,16 @@ data ForkerEnv m l blk = ForkerEnv {
-- | Config
, foeTracer :: !(Tracer m TraceForkerEvent)
-- | Release the resources
, foeResourcesToRelease :: !(StrictTVar m [m ()])
, foeResourcesToRelease :: !(StrictTVar m (m ()))
}
deriving Generic

closeForkerEnv :: IOLike m => (LedgerDBEnv m l blk, ForkerEnv m l blk) -> m ()
closeForkerEnv (LedgerDBEnv{ldbReleaseLock = AllowThunk lock}, frkEnv) =
RAWLock.withWriteAccess lock $
const $ do
sequence_ =<< readTVarIO (foeResourcesToRelease frkEnv)
id =<< readTVarIO (foeResourcesToRelease frkEnv)
atomically $ writeTVar (foeResourcesToRelease frkEnv) (pure ())
pure ((), LDBLock)

deriving instance ( IOLike m
Expand Down Expand Up @@ -273,7 +274,7 @@ newForker h ldbEnv rr st = do
let tr = LedgerDBForkerEvent . TraceForkerEventWithKey forkerKey >$< ldbTracer ldbEnv
traceWith tr ForkerOpen
lseqVar <- newTVarIO . LedgerSeq . AS.Empty $ st
(_, toRelease) <- allocate rr (\_ -> newTVarIO []) (readTVarIO >=> sequence_)
(_, toRelease) <- allocate rr (\_ -> newTVarIO (pure ())) (readTVarIO >=> id)
let forkerEnv = ForkerEnv {
foeLedgerSeq = lseqVar
, foeSwitchVar = ldbSeq ldbEnv
Expand Down Expand Up @@ -301,7 +302,7 @@ implForkerClose ::
-> m ()
implForkerClose (LDBHandle varState) forkerKey = do
menv <- atomically $ readTVar varState >>= \case
LedgerDBClosed -> pure Nothing
LedgerDBClosed -> pure Nothing
LedgerDBOpen ldbEnv -> fmap (ldbEnv,) <$>
stateTVar
(ldbForkers ldbEnv)
Expand Down Expand Up @@ -333,8 +334,7 @@ implForkerRangeReadTables env rq0 = do
NoPreviousQuery -> readRange (tables $ currentHandle ldb) (Nothing, n)
PreviousQueryWasFinal -> pure $ LedgerTables emptyMK
PreviousQueryWasUpTo k -> do
LedgerTables (ValuesMK m) <- readRange (tables $ currentHandle ldb) (Just k, n)
let tbs = LedgerTables $ ValuesMK $ snd $ Map.split k m
tbs <- readRange (tables $ currentHandle ldb) (Just k, n)
traceWith (foeTracer env) ForkerRangeReadTablesEnd
pure tbs

Expand Down Expand Up @@ -366,14 +366,14 @@ implForkerPush env newState = do
(duplicate (tables $ currentHandle lseq))
close
(\newtbs -> do
write newtbs tbs
pushDiffs newtbs tbs

let lseq' = extend (StateRef st newtbs) lseq

traceWith (foeTracer env) ForkerPushEnd
atomically $ do
writeTVar (foeLedgerSeq env) lseq'
modifyTVar (foeResourcesToRelease env) (close newtbs :)
modifyTVar (foeResourcesToRelease env) (>> close newtbs)
)

implForkerCommit ::
Expand All @@ -384,18 +384,26 @@ implForkerCommit env = do
LedgerSeq lseq <- readTVar foeLedgerSeq
let intersectionSlot = getTipSlot $ state $ AS.anchor lseq
let predicate = (== getTipHash (state (AS.anchor lseq))) . getTipHash . state
(statesToClose, LedgerSeq statesDiscarded) <- do
(discardedBySelection, LedgerSeq discardedByPruning) <- do
stateTVar
foeSwitchVar
(\(LedgerSeq olddb) -> fromMaybe theImpossible $ do
(olddb', toClose) <- AS.splitAfterMeasure intersectionSlot (either predicate predicate) olddb
newdb <- AS.join (const $ const True) olddb' lseq
let (l, s) = prune (foeSecurityParam env) (LedgerSeq newdb)
pure ((toClose, l), s)
-- Split the selection at the intersection point. The snd component will
-- have to be closed.
(olddb', toClose) <- AS.splitAfterMeasure intersectionSlot (either predicate predicate) olddb
-- Join the prefix of the selection with the sequence in the forker
newdb <- AS.join (const $ const True) olddb' lseq
-- Prune the resulting sequence to keep @k@ states
let (l, s) = prune (foeSecurityParam env) (LedgerSeq newdb)
pure ((toClose, l), s)
)

-- We are discarding the previous value in the TVar because we had accumulated
-- actions for closing the states pushed to the forker. As we are committing
-- those we have to close the ones discarded in this function and forget about
-- those releasing actions.
writeTVar foeResourcesToRelease $
map (close . tables) $ AS.toOldestFirst statesToClose ++ AS.toOldestFirst statesDiscarded
mapM_ (close . tables) $ AS.toOldestFirst discardedBySelection ++ AS.toOldestFirst discardedByPruning

where
ForkerEnv {
Expand Down Expand Up @@ -452,8 +460,8 @@ acquireAtPoint ldbEnv pt _ = do
dblog <- readTVarIO (ldbSeq ldbEnv)
let immTip = getTip $ anchor dblog
case currentHandle <$> rollback pt dblog of
Nothing | pointSlot pt < pointSlot immTip -> pure $ Left PointTooOld
| otherwise -> pure $ Left PointNotOnChain
Nothing | pointSlot pt < pointSlot immTip -> pure $ Left PointTooOld
| otherwise -> pure $ Left PointNotOnChain
Just (StateRef st tbs) ->
Right . StateRef st <$> duplicate tbs

Expand Down Expand Up @@ -522,7 +530,7 @@ newForkerAtFromTip ::
newForkerAtFromTip h rr n = getEnv h $ \ldbEnv@LedgerDBEnv{ldbReleaseLock = AllowThunk lock} -> do
RAWLock.withReadAccess lock (acquireAtFromTip ldbEnv n) >>= traverse (newForker h ldbEnv rr)

-- | Close all open block and header 'Follower's.
-- | Close all open 'Forker's.
closeAllForkers ::
IOLike m
=> LedgerDBEnv m l blk
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
{-# LANGUAGE TupleSections #-}
{-# LANGUAGE TypeFamilies #-}
{-# LANGUAGE UndecidableInstances #-}
{-# LANGUAGE ViewPatterns #-}

module Ouroboros.Consensus.Storage.LedgerDB.V2.InMemory (
-- * LedgerTablesHandle
Expand Down Expand Up @@ -87,7 +86,7 @@ newInMemoryLedgerTablesHandle someFS@(SomeHasFS hasFS) l = do
!tv <- newTVarIO (LedgerTablesHandleOpen l)
pure LedgerTablesHandle {
close =
atomically $ modifyTVar tv (\_ -> LedgerTablesHandleClosed)
atomically $ writeTVar tv LedgerTablesHandleClosed
, duplicate = do
hs <- readTVarIO tv
!x <- guardClosed hs $ newInMemoryLedgerTablesHandle someFS
Expand All @@ -99,11 +98,11 @@ newInMemoryLedgerTablesHandle someFS@(SomeHasFS hasFS) l = do
hs <- readTVarIO tv
guardClosed hs (\(LedgerTables (ValuesMK m)) ->
pure . LedgerTables . ValuesMK . Map.take t . (maybe id (\g -> snd . Map.split g) f) $ m)
, write = \(!diffs) ->
, pushDiffs = \(!diffs) ->
atomically
$ modifyTVar tv
(\r -> guardClosed r (\st -> LedgerTablesHandleOpen (ltliftA2 rawApplyDiffs st diffs)))
, writeToDisk = \snapshotName -> do
, takeHandleSnapshot = \snapshotName -> do
createDirectoryIfMissing hasFS True $ mkFsPath [snapshotName, "tables"]
h <- readTVarIO tv
guardClosed h $
Expand All @@ -114,7 +113,7 @@ newInMemoryLedgerTablesHandle someFS@(SomeHasFS hasFS) l = do
$ valuesMKEncoder values
, tablesSize = do
hs <- readTVarIO tv
guardClosed hs (\(getLedgerTables -> ValuesMK m) -> pure $ Just $ Map.size m)
guardClosed hs (pure . Just . Map.size . getValuesMK . getLedgerTables)
, isOpen = do
hs <- readTVarIO tv
case hs of
Expand Down Expand Up @@ -144,7 +143,7 @@ writeSnapshot ::
writeSnapshot fs@(SomeHasFS hasFs) encLedger ds st = do
createDirectoryIfMissing hasFs True $ snapshotToDirPath ds
writeExtLedgerState fs encLedger (snapshotToStatePath ds) $ state st
writeToDisk (tables st) $ snapshotToDirName ds
takeHandleSnapshot (tables st) $ snapshotToDirName ds

takeSnapshot ::
( MonadThrow m
Expand Down Expand Up @@ -186,7 +185,6 @@ loadSnapshot _rr ccfg fs@(SomeHasFS hasFS) ds = do
case eExtLedgerSt of
Left err -> pure (Left $ InitFailureRead err)
Right extLedgerSt -> do
traceMarkerIO "Loaded state"
case pointToWithOriginRealPoint (castPoint (getTip extLedgerSt)) of
Origin -> pure (Left InitFailureGenesis)
NotOrigin pt -> do
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,25 +75,20 @@ mkInitDb :: forall m blk.
mkInitDb args flavArgs getBlock =
InitDB {
initFromGenesis = emptyF =<< lgrGenesis
, initFromSnapshot = \ds -> do
traceMarkerIO "Loading snapshot"
s <- loadSnapshot (configCodec . getExtLedgerCfg . ledgerDbCfg $ lgrConfig) lgrHasFS ds
traceMarkerIO "Loaded snapshot"
pure s
, initFromSnapshot = loadSnapshot (configCodec . getExtLedgerCfg . ledgerDbCfg $ lgrConfig) lgrHasFS
, closeDb = closeLedgerSeq
, initReapplyBlock = \a b c -> do
(LedgerSeq x, y) <- reapplyThenPush lgrRegistry a b c
mapM_ (close . tables) (AS.toOldestFirst x)
(x, y) <- reapplyThenPush lgrRegistry a b c
closeLedgerSeq x
pure y
, currentTip = ledgerState . current
, pruneDb = \lseq -> do
let (LedgerSeq rel, dbPrunedToImmDBTip) = pruneToImmTipOnly lseq
mapM_ (close . tables) (AS.toOldestFirst rel)
pure dbPrunedToImmDBTip
, mkLedgerDb = \lseq -> do
traceMarkerIO "Initialize LedgerDB"
(varDB, prevApplied) <-
(,) <$> newTVarIO lseq <*> newTVarIO Set.empty
varDB <- newTVarIO lseq
prevApplied <- newTVarIO Set.empty
forkers <- newTVarIO Map.empty
nextForkerKey <- newTVarIO (ForkerKey 0)
lock <- RAWLock.new LDBLock
Expand Down Expand Up @@ -203,7 +198,7 @@ mkInternals bss h = TestInternals {
, wipeLedgerDB = getEnv h $ destroySnapshots . ldbHasFS
, closeLedgerDB =
let LDBHandle tvar = h in
atomically (modifyTVar tvar (const LedgerDBClosed))
atomically (writeTVar tvar LedgerDBClosed)
, truncateSnapshots = getEnv h $ implIntTruncateSnapshots . ldbHasFS
}
where
Expand All @@ -217,17 +212,6 @@ mkInternals bss h = TestInternals {
InMemoryHandleArgs -> InMemory.takeSnapshot
--TODO LSMHandleArgs -> LSM.takeSnapshot

-- | Testing only! Destroy all snapshots in the DB.
destroySnapshots :: Monad m => SomeHasFS m -> m ()
destroySnapshots (SomeHasFS fs) = do
dirs <- Set.lookupMax . Set.filter (isJust . snapshotFromPath) <$> listDirectory fs (mkFsPath [])
mapM_ ((\d -> do
isDir <- doesDirectoryExist fs d
if isDir
then removeDirectoryRecursive fs d
else removeFile fs d
) . mkFsPath . (:[])) dirs

-- | Testing only! Truncate all snapshots in the DB.
implIntTruncateSnapshots :: MonadThrow m => SomeHasFS m -> m ()
implIntTruncateSnapshots (SomeHasFS fs) = do
Expand Down

This file was deleted.

Loading

0 comments on commit 6cbea64

Please sign in to comment.