Skip to content

Commit

Permalink
Calculate and compare CRC when writing and reading ledger snapshots
Browse files Browse the repository at this point in the history
  • Loading branch information
geo2a committed Nov 21, 2024
1 parent 7aeafae commit 41891a0
Show file tree
Hide file tree
Showing 5 changed files with 66 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -421,8 +421,8 @@ storeLedgerStateAt slotNo ledgerAppMode env = do
storeLedgerState :: ExtLedgerState blk -> IO ()
storeLedgerState ledgerState = case pointSlot pt of
NotOrigin slot -> do
let snapshot = DiskSnapshot (unSlotNo slot) (Just "db-analyser")
writeSnapshot ledgerDbFS encLedger snapshot ledgerState
let snapshot = DiskSnapshot (unSlotNo slot) (Just "db-analyser") Nothing
void $ writeSnapshot ledgerDbFS encLedger snapshot ledgerState
traceWith tracer $ SnapshotStoredEvent slot
Origin -> pure ()
where
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ analyse DBAnalyserConfig{analysis, confLimit, dbDir, selectDB, validation, verbo
ledgerDbFS
(decodeDiskExtLedgerState $ configCodec cfg)
decode
(DiskSnapshot slot (Just "db-analyser"))
(DiskSnapshot slot (Just "db-analyser") Nothing)
-- TODO @readSnapshot@ has type @ExceptT ReadIncrementalErr m
-- (ExtLedgerState blk)@ but it also throws exceptions! This makes
-- error handling more challenging than it ought to be. Maybe we
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE KindSignatures #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE RankNTypes #-}
Expand Down Expand Up @@ -34,7 +35,7 @@ import qualified Codec.CBOR.Write as CBOR
import Codec.Serialise.Decoding (Decoder)
import qualified Codec.Serialise.Decoding as Dec
import Codec.Serialise.Encoding (Encoding)
import Control.Monad (forM, void)
import Control.Monad (forM)
import Control.Monad.Except (ExceptT (..))
import Control.Tracer
import Data.Functor.Contravariant ((>$<))
Expand All @@ -56,6 +57,7 @@ import Ouroboros.Consensus.Util.Enclose
import Ouroboros.Consensus.Util.IOLike
import Ouroboros.Consensus.Util.Versioned
import System.FS.API.Lazy
import System.FS.CRC (CRC (..), hPutAllCRC)
import Text.Read (readMaybe)

{-------------------------------------------------------------------------------
Expand All @@ -66,7 +68,7 @@ data SnapshotFailure blk =
-- | We failed to deserialise the snapshot
--
-- This can happen due to data corruption in the ledger DB.
InitFailureRead ReadIncrementalErr
InitFailureRead ReadSnaphotErr

-- | This snapshot is too recent (ahead of the tip of the chain)
| InitFailureTooRecent (RealPoint blk)
Expand Down Expand Up @@ -116,14 +118,14 @@ takeSnapshot tracer hasFS encLedger oldest =
return Nothing
NotOrigin tip -> do
let number = unSlotNo (realPointSlot tip)
snapshot = DiskSnapshot number Nothing
snapshot = DiskSnapshot number Nothing Nothing
snapshots <- listSnapshots hasFS
if List.any ((== number) . dsNumber) snapshots then
return Nothing
else do
encloseTimedWith (TookSnapshot snapshot tip >$< tracer)
$ writeSnapshot hasFS encLedger snapshot oldest
return $ Just (snapshot, tip)
snapshotWithCRC <- writeSnapshot hasFS encLedger snapshot oldest
encloseTimedWith (TookSnapshot snapshotWithCRC tip >$< tracer) $ pure ()
return $ Just (snapshotWithCRC, tip)

-- | Trim the number of on disk snapshots so that at most 'onDiskNumSnapshots'
-- snapshots are stored on disk. The oldest snapshots are deleted.
Expand Down Expand Up @@ -161,15 +163,25 @@ data DiskSnapshot = DiskSnapshot {
-- snapshot number matching the slot number of the corresponding ledger
-- state. We only use the snapshots numbers to determine the order in
-- which we try them.
dsNumber :: Word64
dsNumber :: Word64

-- | Snapshots can optionally have a suffix, separated by the snapshot
-- number with an underscore, e.g., @4492799_last_Byron@. This suffix acts
-- as metadata for the operator of the node. Snapshots with a suffix will
-- /not be trimmed/.
, dsSuffix :: Maybe String
, dsSuffix :: Maybe String

-- | Snapshots can carry a checksum, which is filled in once they
-- have been written to disk. When reading a snapshot, the checksum
-- of the read data will be checked to match to detect data corruption.
, dsChecksum :: Maybe CRC
}
deriving (Show, Eq, Ord, Generic)
deriving (Show, Eq, Generic)

-- | TODO: CRC should really just have an @'Ord'@ instance as it's just a newtype
instance Ord DiskSnapshot where
compare (DiskSnapshot dsNumberL dsSuffixL dsChecksumL) (DiskSnapshot dsNumberR dsSuffixR dsChecksumR) =
compare (dsNumberL, dsSuffixL, getCRC <$> dsChecksumL) (dsNumberR, dsSuffixR, getCRC <$> dsChecksumR)

-- | Named snapshot are permanent, they will never be deleted when trimming.
diskSnapshotIsPermanent :: DiskSnapshot -> Bool
Expand All @@ -180,32 +192,51 @@ diskSnapshotIsPermanent = isJust . dsSuffix
diskSnapshotIsTemporary :: DiskSnapshot -> Bool
diskSnapshotIsTemporary = not . diskSnapshotIsPermanent

-- | Read snapshot from disk
data ReadSnaphotErr =
-- | Error while de-serialising data
ReadSnaphotFailed ReadIncrementalErr

-- | Checksum of read snapshot differs from the one tracked by @'DiskSnapshot'@
| ReadSnaphotBadCRC
deriving (Eq, Show)

-- | Read snapshot from disk.
--
-- Fail on data corruption, i.e. when the checksum of the read data differs
-- from the one tracked by @'DiskSnapshot'@, if any.
readSnapshot ::
forall m blk. IOLike m
=> SomeHasFS m
-> (forall s. Decoder s (ExtLedgerState blk))
-> (forall s. Decoder s (HeaderHash blk))
-> DiskSnapshot
-> ExceptT ReadIncrementalErr m (ExtLedgerState blk)
readSnapshot hasFS decLedger decHash =
ExceptT
. readIncremental hasFS decoder
. snapshotToPath
-> ExceptT ReadSnaphotErr m (ExtLedgerState blk)
readSnapshot hasFS decLedger decHash snapshot = ExceptT $
readIncremental hasFS decoder (snapshotToPath snapshot) >>= \case
Left e -> pure $ Left (ReadSnaphotFailed e)
Right (ledgerState, checksumAsRead) -> case dsChecksum snapshot of
Just checksumAsWritten ->
if checksumAsWritten /= checksumAsRead
then pure $ Left ReadSnaphotBadCRC
else pure (Right ledgerState)
Nothing -> pure (Right ledgerState)
where
decoder :: Decoder s (ExtLedgerState blk)
decoder = decodeSnapshotBackwardsCompatible (Proxy @blk) decLedger decHash

-- | Write snapshot to disk
-- | Write a snapshot to disk, returning a @'DiskSnapshot'@ with the checksum of ledger state.
--
-- The checksum of the input @'DiskSnapshot'@, if any, is overridden.
writeSnapshot ::
forall m blk. MonadThrow m
=> SomeHasFS m
-> (ExtLedgerState blk -> Encoding)
-> DiskSnapshot
-> ExtLedgerState blk -> m ()
-> ExtLedgerState blk -> m DiskSnapshot
writeSnapshot (SomeHasFS hasFS) encLedger ss cs = do
withFile hasFS (snapshotToPath ss) (WriteMode MustBeNew) $ \h ->
void $ hPut hasFS h $ CBOR.toBuilder (encode cs)
withFile hasFS (snapshotToPath ss) (WriteMode MustBeNew) $ \h -> do
(_, crc) <- hPutAllCRC hasFS h $ CBOR.toLazyByteString (encode cs)
pure ss{dsChecksum = Just crc}
where
encode :: ExtLedgerState blk -> Encoding
encode = encodeSnapshot encLedger
Expand Down Expand Up @@ -236,7 +267,7 @@ snapshotToPath = mkFsPath . (:[]) . snapshotToFileName
snapshotFromPath :: String -> Maybe DiskSnapshot
snapshotFromPath fileName = do
number <- readMaybe prefix
return $ DiskSnapshot number suffix'
return $ DiskSnapshot number suffix' Nothing
where
(prefix, suffix) = break (== '_') fileName

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ import qualified Streaming as S
import qualified Streaming.Prelude as S
import Streaming.Prelude (Of (..), Stream)
import System.FS.API
import System.FS.CRC (CRC (..), initCRC, updateCRC)

{-------------------------------------------------------------------------------
Incremental parsing in I/O
Expand Down Expand Up @@ -186,24 +187,25 @@ readIncremental :: forall m a. IOLike m
=> SomeHasFS m
-> CBOR.D.Decoder (U.PrimState m) a
-> FsPath
-> m (Either ReadIncrementalErr a)
-> m (Either ReadIncrementalErr (a, CRC))
readIncremental = \(SomeHasFS hasFS) decoder fp -> do
withFile hasFS fp ReadMode $ \h ->
go hasFS h =<< U.stToIO (CBOR.R.deserialiseIncremental decoder)
go hasFS h initCRC =<< U.stToIO (CBOR.R.deserialiseIncremental decoder)
where
go :: HasFS m h
-> Handle h
-> CRC
-> CBOR.R.IDecode (U.PrimState m) a
-> m (Either ReadIncrementalErr a)
go hasFS@HasFS{..} h (CBOR.R.Partial k) = do
-> m (Either ReadIncrementalErr (a, CRC))
go hasFS@HasFS{..} h checksum (CBOR.R.Partial k) = do
bs <- hGetSome h (fromIntegral defaultChunkSize)
dec' <- U.stToIO $ k (checkEmpty bs)
go hasFS h dec'
go _ _ (CBOR.R.Done leftover _ a) =
go hasFS h (updateCRC bs checksum) dec'
go _ _ checksum (CBOR.R.Done leftover _ a) =
return $ if BS.null leftover
then Right a
then Right (a, checksum)
else Left $ TrailingBytes leftover
go _ _ (CBOR.R.Fail _ _ err) =
go _ _ _ (CBOR.R.Fail _ _ err) =
return $ Left $ ReadFailed err

checkEmpty :: ByteString -> Maybe ByteString
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import Ouroboros.Network.Mock.Chain
import Ouroboros.Network.Mock.ProducerState
import Ouroboros.Network.Point
import System.FS.API
import System.FS.CRC (CRC (..))
import Test.Cardano.Slotting.TreeDiff ()
import Test.Util.ToExpr ()

Expand Down Expand Up @@ -65,6 +66,7 @@ instance ( ToExpr (TipInfo blk)
) => ToExpr (AnnTip blk)

instance ToExpr SecurityParam
instance ToExpr CRC
instance ToExpr DiskSnapshot

instance ToExpr ChunkSize
Expand Down

0 comments on commit 41891a0

Please sign in to comment.