Skip to content

Commit

Permalink
limit cache when close to tip
Browse files Browse the repository at this point in the history
  • Loading branch information
Cmdv committed Nov 1, 2024
1 parent 03ca8f1 commit c3e80f5
Show file tree
Hide file tree
Showing 10 changed files with 187 additions and 115 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ drepDistr =

newCommittee :: IOManager -> [(Text, Text)] -> Assertion
newCommittee =
withFullConfigAndLogs conwayConfigDir testLabel $ \interpreter server dbSync -> do
withFullConfig conwayConfigDir testLabel $ \interpreter server dbSync -> do
startDBSync dbSync

-- Add stake
Expand Down
6 changes: 5 additions & 1 deletion cardano-db-sync/src/Cardano/DbSync/Api/Ledger.hs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,11 @@ migrateBootstrapUTxO syncEnv = do
where
trce = getTrace syncEnv

storeUTxOFromLedger :: (MonadBaseControl IO m, MonadIO m) => SyncEnv -> ExtLedgerState CardanoBlock -> ExceptT SyncNodeError (ReaderT SqlBackend m) ()
storeUTxOFromLedger ::
(MonadBaseControl IO m, MonadIO m) =>
SyncEnv ->
ExtLedgerState CardanoBlock ->
ExceptT SyncNodeError (ReaderT SqlBackend m) ()
storeUTxOFromLedger env st = case ledgerState st of
LedgerStateBabbage bts -> storeUTxO env (getUTxO bts)
LedgerStateConway stc -> storeUTxO env (getUTxO stc)
Expand Down
227 changes: 132 additions & 95 deletions cardano-db-sync/src/Cardano/DbSync/Cache.hs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ module Cardano.DbSync.Cache (
queryStakeAddrWithCache,
queryTxIdWithCache,
rollbackCache,
optimiseCaches,
tryUpdateCacheTx,

-- * CacheStatistics
Expand Down Expand Up @@ -73,18 +74,35 @@ import Ouroboros.Consensus.Cardano.Block (StandardCrypto)
-- NOTE: Other tables are not cleaned up since they are not rollbacked.
rollbackCache :: MonadIO m => CacheStatus -> DB.BlockId -> ReaderT SqlBackend m ()
rollbackCache NoCache _ = pure ()
rollbackCache (ActiveCache cache) blockId = do
rollbackCache (ActiveCache _ cache) blockId = do
liftIO $ do
atomically $ writeTVar (cPrevBlock cache) Nothing
atomically $ modifyTVar (cDatum cache) LRU.cleanup
atomically $ modifyTVar (cTxIds cache) FIFO.cleanupCache
void $ rollbackMapEpochInCache cache blockId

-- When syncing and we're close to the tip, we can optimise the caches.
optimiseCaches :: MonadIO m => CacheStatus -> ReaderT SqlBackend m CacheStatus
optimiseCaches c@(ActiveCache isCacheOptomised cache) = do
if isCacheOptomised
then liftIO $ do
-- empty caches not to be used anymore
atomically $ modifyTVar (cTxIds cache) FIFO.cleanupCache
atomically $ writeTVar (cStake cache) (StakeCache Map.empty (LRU.empty 0))
atomically $ modifyTVar (cDatum cache) (LRU.optimise 0)
-- empty then limit the capacity of the cache
atomically $ writeTVar (cMultiAssets cache) (LRU.empty 50000)
-- leaving the following caches as they are:
-- cPools, cPrevBlock, Cstats, cEpoch
pure c
else pure c
optimiseCaches c = pure c

getCacheStatistics :: CacheStatus -> IO CacheStatistics
getCacheStatistics cs =
case cs of
NoCache -> pure initCacheStatistics
ActiveCache ci -> readTVarIO (cStats ci)
ActiveCache _ ci -> readTVarIO (cStats ci)

queryOrInsertRewardAccount ::
(MonadBaseControl IO m, MonadIO m) =>
Expand Down Expand Up @@ -150,34 +168,36 @@ queryStakeAddrWithCacheRetBs ::
queryStakeAddrWithCacheRetBs _trce cache cacheUA ra@(Ledger.RewardAccount _ cred) = do
let bs = Ledger.serialiseRewardAccount ra
case cache of
NoCache -> do
mapLeft (,bs) <$> resolveStakeAddress bs
ActiveCache ci -> do
stakeCache <- liftIO $ readTVarIO (cStake ci)
case queryStakeCache cred stakeCache of
Just (addrId, stakeCache') -> do
liftIO $ hitCreds (cStats ci)
case cacheUA of
EvictAndUpdateCache -> do
liftIO $ atomically $ writeTVar (cStake ci) $ deleteStakeCache cred stakeCache'
pure $ Right addrId
_other -> do
liftIO $ atomically $ writeTVar (cStake ci) stakeCache'
pure $ Right addrId
Nothing -> do
queryRes <- mapLeft (,bs) <$> resolveStakeAddress bs
liftIO $ missCreds (cStats ci)
case queryRes of
Left _ -> pure queryRes
Right stakeAddrsId -> do
let !stakeCache' = case cacheUA of
UpdateCache -> stakeCache {scLruCache = LRU.insert cred stakeAddrsId (scLruCache stakeCache)}
UpdateCacheStrong -> stakeCache {scStableCache = Map.insert cred stakeAddrsId (scStableCache stakeCache)}
_ -> stakeCache
liftIO $
atomically $
writeTVar (cStake ci) stakeCache'
pure $ Right stakeAddrsId
NoCache -> mapLeft (,bs) <$> resolveStakeAddress bs
ActiveCache shouldOptomiseCache ci -> do
if shouldOptomiseCache
then mapLeft (,bs) <$> resolveStakeAddress bs
else do
stakeCache <- liftIO $ readTVarIO (cStake ci)
case queryStakeCache cred stakeCache of
Just (addrId, stakeCache') -> do
liftIO $ hitCreds (cStats ci)
case cacheUA of
EvictAndUpdateCache -> do
liftIO $ atomically $ writeTVar (cStake ci) $ deleteStakeCache cred stakeCache'
pure $ Right addrId
_other -> do
liftIO $ atomically $ writeTVar (cStake ci) stakeCache'
pure $ Right addrId
Nothing -> do
queryRes <- mapLeft (,bs) <$> resolveStakeAddress bs
liftIO $ missCreds (cStats ci)
case queryRes of
Left _ -> pure queryRes
Right stakeAddrsId -> do
let !stakeCache' = case cacheUA of
UpdateCache -> stakeCache {scLruCache = LRU.insert cred stakeAddrsId (scLruCache stakeCache)}
UpdateCacheStrong -> stakeCache {scStableCache = Map.insert cred stakeAddrsId (scStableCache stakeCache)}
_ -> stakeCache
liftIO $
atomically $
writeTVar (cStake ci) stakeCache'
pure $ Right stakeAddrsId

-- | True if it was found in LRU
queryStakeCache :: StakeCred -> StakeCache -> Maybe (DB.StakeAddressId, StakeCache)
Expand All @@ -204,7 +224,7 @@ queryPoolKeyWithCache cache cacheUA hsh =
case mPhId of
Nothing -> pure $ Left (DB.DbLookupMessage "PoolKeyHash")
Just phId -> pure $ Right phId
ActiveCache ci -> do
ActiveCache _ ci -> do
mp <- liftIO $ readTVarIO (cPools ci)
case Map.lookup hsh mp of
Just phId -> do
Expand Down Expand Up @@ -244,7 +264,7 @@ insertPoolKeyWithCache cache cacheUA pHash =
{ DB.poolHashHashRaw = Generic.unKeyHashRaw pHash
, DB.poolHashView = Generic.unKeyHashView pHash
}
ActiveCache ci -> do
ActiveCache _ ci -> do
mp <- liftIO $ readTVarIO (cPools ci)
case Map.lookup pHash mp of
Just phId -> do
Expand Down Expand Up @@ -306,26 +326,31 @@ queryMAWithCache ::
ReaderT SqlBackend m (Either (ByteString, ByteString) DB.MultiAssetId)
queryMAWithCache cache policyId asset =
case cache of
NoCache -> do
NoCache -> queryDb
ActiveCache isCacheOptomised ci -> do
if isCacheOptomised
then queryDb
else do
mp <- liftIO $ readTVarIO (cMultiAssets ci)
case LRU.lookup (policyId, asset) mp of
Just (maId, mp') -> do
liftIO $ hitMAssets (cStats ci)
liftIO $ atomically $ writeTVar (cMultiAssets ci) mp'
pure $ Right maId
Nothing -> do
liftIO $ missMAssets (cStats ci)
-- miss. The lookup doesn't change the cache on a miss.
let !policyBs = Generic.unScriptHash $ policyID policyId
let !assetNameBs = Generic.unAssetName asset
maId <- maybe (Left (policyBs, assetNameBs)) Right <$> DB.queryMultiAssetId policyBs assetNameBs
whenRight maId $
liftIO . atomically . modifyTVar (cMultiAssets ci) . LRU.insert (policyId, asset)
pure maId
where
queryDb = do
let !policyBs = Generic.unScriptHash $ policyID policyId
let !assetNameBs = Generic.unAssetName asset
maybe (Left (policyBs, assetNameBs)) Right <$> DB.queryMultiAssetId policyBs assetNameBs
ActiveCache ci -> do
mp <- liftIO $ readTVarIO (cMultiAssets ci)
case LRU.lookup (policyId, asset) mp of
Just (maId, mp') -> do
liftIO $ hitMAssets (cStats ci)
liftIO $ atomically $ writeTVar (cMultiAssets ci) mp'
pure $ Right maId
Nothing -> do
liftIO $ missMAssets (cStats ci)
-- miss. The lookup doesn't change the cache on a miss.
let !policyBs = Generic.unScriptHash $ policyID policyId
let !assetNameBs = Generic.unAssetName asset
maId <- maybe (Left (policyBs, assetNameBs)) Right <$> DB.queryMultiAssetId policyBs assetNameBs
whenRight maId $
liftIO . atomically . modifyTVar (cMultiAssets ci) . LRU.insert (policyId, asset)
pure maId

queryPrevBlockWithCache ::
MonadIO m =>
Expand All @@ -336,7 +361,7 @@ queryPrevBlockWithCache ::
queryPrevBlockWithCache msg cache hsh =
case cache of
NoCache -> liftLookupFail msg $ DB.queryBlockId hsh
ActiveCache ci -> do
ActiveCache _ ci -> do
mCachedPrev <- liftIO $ readTVarIO (cPrevBlock ci)
case mCachedPrev of
-- if the cached block matches the requested hash, we return its db id.
Expand Down Expand Up @@ -365,27 +390,30 @@ queryTxIdWithCache cache txIdLedger = do
case cache of
-- Direct database query if no cache.
NoCache -> DB.queryTxId txHash
ActiveCache cacheInternal -> do
-- Read current cache state.
cacheTx <- liftIO $ readTVarIO (cTxIds cacheInternal)

case FIFO.lookup txIdLedger cacheTx of
-- Cache hit, return the transaction ID.
Just txId -> do
liftIO $ hitTxIds (cStats cacheInternal)
pure $ Right txId
-- Cache miss.
Nothing -> do
eTxId <- DB.queryTxId txHash
liftIO $ missTxIds (cStats cacheInternal)
case eTxId of
Right txId -> do
-- Update cache.
liftIO $ atomically $ modifyTVar (cTxIds cacheInternal) $ FIFO.insert txIdLedger txId
-- Return ID after updating cache.
ActiveCache isCacheOptomised cacheInternal -> do
if isCacheOptomised
then DB.queryTxId txHash
else do
-- Read current cache state.
cacheTx <- liftIO $ readTVarIO (cTxIds cacheInternal)

case FIFO.lookup txIdLedger cacheTx of
-- Cache hit, return the transaction ID.
Just txId -> do
liftIO $ hitTxIds (cStats cacheInternal)
pure $ Right txId
-- Return lookup failure.
Left _ -> pure $ Left $ DB.DbLookupTxHash txHash
-- Cache miss.
Nothing -> do
eTxId <- DB.queryTxId txHash
liftIO $ missTxIds (cStats cacheInternal)
case eTxId of
Right txId -> do
-- Update cache.
liftIO $ atomically $ modifyTVar (cTxIds cacheInternal) $ FIFO.insert txIdLedger txId
-- Return ID after updating cache.
pure $ Right txId
-- Return lookup failure.
Left _ -> pure $ Left $ DB.DbLookupTxHash txHash
where
txHash = Generic.unTxHash txIdLedger

Expand All @@ -398,8 +426,10 @@ tryUpdateCacheTx ::
tryUpdateCacheTx cache ledgerTxId txId = do
case cache of
NoCache -> pure ()
ActiveCache ci -> do
liftIO $ atomically $ modifyTVar (cTxIds ci) $ FIFO.insert ledgerTxId txId
ActiveCache isCacheOptomised ci -> do
if isCacheOptomised
then pure ()
else liftIO $ atomically $ modifyTVar (cTxIds ci) $ FIFO.insert ledgerTxId txId

insertBlockAndCache ::
(MonadIO m, MonadBaseControl IO m) =>
Expand All @@ -409,12 +439,15 @@ insertBlockAndCache ::
insertBlockAndCache cache block =
case cache of
NoCache -> DB.insertBlock block
ActiveCache ci -> do
bid <- DB.insertBlock block
liftIO $ do
missPrevBlock (cStats ci)
atomically $ writeTVar (cPrevBlock ci) $ Just (bid, DB.blockHash block)
pure bid
ActiveCache isCacheOptomised ci ->
if isCacheOptomised
then DB.insertBlock block
else do
bid <- DB.insertBlock block
liftIO $ do
missPrevBlock (cStats ci)
atomically $ writeTVar (cPrevBlock ci) $ Just (bid, DB.blockHash block)
pure bid

queryDatum ::
MonadIO m =>
Expand All @@ -424,17 +457,20 @@ queryDatum ::
queryDatum cache hsh = do
case cache of
NoCache -> DB.queryDatum $ Generic.dataHashToBytes hsh
ActiveCache ci -> do
mp <- liftIO $ readTVarIO (cDatum ci)
case LRU.lookup hsh mp of
Just (datumId, mp') -> do
liftIO $ hitDatum (cStats ci)
liftIO $ atomically $ writeTVar (cDatum ci) mp'
pure $ Just datumId
Nothing -> do
liftIO $ missDatum (cStats ci)
-- miss. The lookup doesn't change the cache on a miss.
DB.queryDatum $ Generic.dataHashToBytes hsh
ActiveCache isCacheOptomised ci -> do
if isCacheOptomised
then DB.queryDatum $ Generic.dataHashToBytes hsh
else do
mp <- liftIO $ readTVarIO (cDatum ci)
case LRU.lookup hsh mp of
Just (datumId, mp') -> do
liftIO $ hitDatum (cStats ci)
liftIO $ atomically $ writeTVar (cDatum ci) mp'
pure $ Just datumId
Nothing -> do
liftIO $ missDatum (cStats ci)
-- miss. The lookup doesn't change the cache on a miss.
DB.queryDatum $ Generic.dataHashToBytes hsh

-- This assumes the entry is not cached.
insertDatumAndCache ::
Expand All @@ -447,12 +483,13 @@ insertDatumAndCache cache hsh dt = do
datumId <- DB.insertDatum dt
case cache of
NoCache -> pure datumId
ActiveCache ci -> do
liftIO $
atomically $
modifyTVar (cDatum ci) $
LRU.insert hsh datumId
pure datumId
ActiveCache isCacheOptomised ci -> do
if isCacheOptomised then pure datumId else do
liftIO $
atomically $
modifyTVar (cDatum ci) $
LRU.insert hsh datumId
pure datumId

-- Stakes
hitCreds :: StrictTVar IO CacheStatistics -> IO ()
Expand Down
10 changes: 5 additions & 5 deletions cardano-db-sync/src/Cardano/DbSync/Cache/Epoch.hs
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,15 @@ readCacheEpoch :: MonadIO m => CacheStatus -> m (Maybe CacheEpoch)
readCacheEpoch cache =
case cache of
NoCache -> pure Nothing
ActiveCache ci -> do
ActiveCache _ ci -> do
cacheEpoch <- liftIO $ readTVarIO (cEpoch ci)
pure $ Just cacheEpoch

readEpochBlockDiffFromCache :: MonadIO m => CacheStatus -> m (Maybe EpochBlockDiff)
readEpochBlockDiffFromCache cache =
case cache of
NoCache -> pure Nothing
ActiveCache ci -> do
ActiveCache _ ci -> do
cE <- liftIO $ readTVarIO (cEpoch ci)
case (ceMapEpoch cE, ceEpochBlockDiff cE) of
(_, epochInternal) -> pure epochInternal
Expand All @@ -46,7 +46,7 @@ readLastMapEpochFromCache :: CacheStatus -> IO (Maybe DB.Epoch)
readLastMapEpochFromCache cache =
case cache of
NoCache -> pure Nothing
ActiveCache ci -> do
ActiveCache _ ci -> do
cE <- readTVarIO (cEpoch ci)
let mapEpoch = ceMapEpoch cE
-- making sure db sync wasn't restarted on the last block in epoch
Expand All @@ -72,7 +72,7 @@ writeEpochBlockDiffToCache ::
writeEpochBlockDiffToCache cache epCurrent =
case cache of
NoCache -> pure $ Left $ SNErrDefault "writeEpochBlockDiffToCache: Cache is NoCache"
ActiveCache ci -> do
ActiveCache _ ci -> do
cE <- liftIO $ readTVarIO (cEpoch ci)
case (ceMapEpoch cE, ceEpochBlockDiff cE) of
(epochLatest, _) -> writeToCache ci (CacheEpoch epochLatest (Just epCurrent))
Expand All @@ -94,7 +94,7 @@ writeToMapEpochCache syncEnv cache latestEpoch = do
NoLedger nle -> getSecurityParameter $ nleProtocolInfo nle
case cache of
NoCache -> pure $ Left $ SNErrDefault "writeToMapEpochCache: Cache is NoCache"
ActiveCache ci -> do
ActiveCache _ ci -> do
-- get EpochBlockDiff so we can use the BlockId we stored when inserting blocks
epochInternalCE <- readEpochBlockDiffFromCache cache
case epochInternalCE of
Expand Down
Loading

0 comments on commit c3e80f5

Please sign in to comment.