Skip to content

Commit

Permalink
limit cache when close to tip
Browse files Browse the repository at this point in the history
  • Loading branch information
Cmdv committed Oct 29, 2024
1 parent 1b42996 commit 8651d2e
Show file tree
Hide file tree
Showing 19 changed files with 311 additions and 204 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ drepDistr =

newCommittee :: IOManager -> [(Text, Text)] -> Assertion
newCommittee =
withFullConfigAndLogs conwayConfigDir testLabel $ \interpreter server dbSync -> do
withFullConfig conwayConfigDir testLabel $ \interpreter server dbSync -> do
startDBSync dbSync

-- Add stake
Expand Down
2 changes: 1 addition & 1 deletion cardano-db-sync/src/Cardano/DbSync.hs
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ runSyncNode metricsSetters trce iomgr dbConnString ranMigrations runMigrationFnc
unless useLedger $ liftIO $ do
logInfo trce "Migrating to a no ledger schema"
Db.noLedgerMigrations backend trce
insertValidateGenesisDist syncEnv (dncNetworkName syncNodeConfigFromFile) genCfg (useShelleyInit syncNodeConfigFromFile)
insertValidateGenesisDist syncEnv Nothing (dncNetworkName syncNodeConfigFromFile) genCfg (useShelleyInit syncNodeConfigFromFile)

-- communication channel between datalayer thread and chainsync-client thread
threadChannels <- liftIO newThreadChannels
Expand Down
38 changes: 24 additions & 14 deletions cardano-db-sync/src/Cardano/DbSync/Api/Ledger.hs
Original file line number Diff line number Diff line change
Expand Up @@ -51,16 +51,18 @@ import qualified Ouroboros.Consensus.Shelley.Ledger.Ledger as Consensus
bootStrapMaybe ::
(MonadBaseControl IO m, MonadIO m) =>
SyncEnv ->
SlotDetails ->
ExceptT SyncNodeError (ReaderT SqlBackend m) ()
bootStrapMaybe syncEnv = do
bootStrapMaybe syncEnv slotDetails = do
bts <- liftIO $ readTVarIO (envBootstrap syncEnv)
when bts $ migrateBootstrapUTxO syncEnv
when bts $ migrateBootstrapUTxO syncEnv slotDetails

migrateBootstrapUTxO ::
(MonadBaseControl IO m, MonadIO m) =>
SyncEnv ->
SlotDetails ->
ExceptT SyncNodeError (ReaderT SqlBackend m) ()
migrateBootstrapUTxO syncEnv = do
migrateBootstrapUTxO syncEnv slotDetails = do
case envLedgerEnv syncEnv of
HasLedger lenv -> do
liftIO $ logInfo trce "Starting UTxO bootstrap migration"
Expand All @@ -70,7 +72,7 @@ migrateBootstrapUTxO syncEnv = do
liftIO $
logWarning trce $
"Found and deleted " <> textShow count <> " tx_out."
storeUTxOFromLedger syncEnv cls
storeUTxOFromLedger syncEnv slotDetails cls
lift $ DB.insertExtraMigration DB.BootstrapFinished
liftIO $ logInfo trce "UTxO bootstrap migration done"
liftIO $ atomically $ writeTVar (envBootstrap syncEnv) False
Expand All @@ -79,10 +81,15 @@ migrateBootstrapUTxO syncEnv = do
where
trce = getTrace syncEnv

storeUTxOFromLedger :: (MonadBaseControl IO m, MonadIO m) => SyncEnv -> ExtLedgerState CardanoBlock -> ExceptT SyncNodeError (ReaderT SqlBackend m) ()
storeUTxOFromLedger env st = case ledgerState st of
LedgerStateBabbage bts -> storeUTxO env (getUTxO bts)
LedgerStateConway stc -> storeUTxO env (getUTxO stc)
storeUTxOFromLedger ::
(MonadBaseControl IO m, MonadIO m) =>
SyncEnv ->
SlotDetails ->
ExtLedgerState CardanoBlock ->
ExceptT SyncNodeError (ReaderT SqlBackend m) ()
storeUTxOFromLedger env slotDetails st = case ledgerState st of
LedgerStateBabbage bts -> storeUTxO env slotDetails (getUTxO bts)
LedgerStateConway stc -> storeUTxO env slotDetails (getUTxO stc)
_otherwise -> liftIO $ logError trce "storeUTxOFromLedger is only supported after Babbage"
where
trce = getTrace env
Expand All @@ -104,9 +111,10 @@ storeUTxO ::
, NativeScript era ~ Timelock era
) =>
SyncEnv ->
SlotDetails ->
Map (TxIn StandardCrypto) (BabbageTxOut era) ->
ExceptT SyncNodeError (ReaderT SqlBackend m) ()
storeUTxO env mp = do
storeUTxO env slotDetails mp = do
liftIO $
logInfo trce $
mconcat
Expand All @@ -115,7 +123,7 @@ storeUTxO env mp = do
, " tx_out as pages of "
, textShow pageSize
]
mapM_ (storePage env pagePerc) . zip [0 ..] . chunksOf pageSize . Map.toList $ mp
mapM_ (storePage env slotDetails pagePerc) . zip [0 ..] . chunksOf pageSize . Map.toList $ mp
where
trce = getTrace env
npages = size `div` pageSize
Expand All @@ -134,12 +142,13 @@ storePage ::
, MonadBaseControl IO m
) =>
SyncEnv ->
SlotDetails ->
Float ->
(Int, [(TxIn StandardCrypto, BabbageTxOut era)]) ->
ExceptT SyncNodeError (ReaderT SqlBackend m) ()
storePage syncEnv percQuantum (n, ls) = do
storePage syncEnv slotDetails percQuantum (n, ls) = do
when (n `mod` 10 == 0) $ liftIO $ logInfo trce $ "Bootstrap in progress " <> prc <> "%"
txOuts <- mapM (prepareTxOut syncEnv) ls
txOuts <- mapM (prepareTxOut syncEnv slotDetails) ls
txOutIds <-
lift . DB.insertManyTxOut False $ etoTxOut . fst <$> txOuts
let maTxOuts = concatMap (mkmaTxOuts txOutTableType) $ zip txOutIds (snd <$> txOuts)
Expand All @@ -161,12 +170,13 @@ prepareTxOut ::
, NativeScript era ~ Timelock era
) =>
SyncEnv ->
SlotDetails ->
(TxIn StandardCrypto, BabbageTxOut era) ->
ExceptT SyncNodeError (ReaderT SqlBackend m) (ExtendedTxOut, [MissingMaTxOut])
prepareTxOut syncEnv (TxIn txIntxId (TxIx index), txOut) = do
prepareTxOut syncEnv slotDetails (TxIn txIntxId (TxIx index), txOut) = do
let txHashByteString = Generic.safeHashToByteString $ unTxId txIntxId
let genTxOut = fromTxOut index txOut
txId <- liftLookupFail "prepareTxOut" $ queryTxIdWithCache cache txIntxId
txId <- liftLookupFail "prepareTxOut" $ queryTxIdWithCache slotDetails cache txIntxId
insertTxOut trce cache iopts (txId, txHashByteString) genTxOut
where
trce = getTrace syncEnv
Expand Down
190 changes: 114 additions & 76 deletions cardano-db-sync/src/Cardano/DbSync/Cache.hs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ module Cardano.DbSync.Cache (
queryStakeAddrWithCache,
queryTxIdWithCache,
rollbackCache,
deleteSyncingCaches,
tryUpdateCacheTx,

-- * CacheStatistics
Expand All @@ -39,6 +40,7 @@ import Cardano.DbSync.Era.Shelley.Query
import Cardano.DbSync.Era.Util
import Cardano.DbSync.Error
import Cardano.DbSync.Types
import Cardano.DbSync.Util (getSyncStatusInHalfHour)
import qualified Cardano.Ledger.Address as Ledger
import Cardano.Ledger.BaseTypes (Network)
import Cardano.Ledger.Mary.Value
Expand Down Expand Up @@ -80,6 +82,17 @@ rollbackCache (ActiveCache cache) blockId = do
atomically $ modifyTVar (cTxIds cache) FIFO.cleanupCache
void $ rollbackMapEpochInCache cache blockId

-- TODO: need to work out which caches to clean up.
deleteSyncingCaches :: MonadIO m => CacheStatus -> ReaderT SqlBackend m ()
deleteSyncingCaches NoCache = pure ()
deleteSyncingCaches (ActiveCache cache) = do
liftIO $ do
atomically $ writeTVar (cPrevBlock cache) Nothing
atomically $ modifyTVar (cDatum cache) LRU.cleanup
atomically $ modifyTVar (cTxIds cache) FIFO.cleanupCache
atomically $ writeTVar (cStake cache) (StakeCache Map.empty (LRU.empty 0))
atomically $ writeTVar (cMultiAssets cache) (LRU.empty 0)

getCacheStatistics :: CacheStatus -> IO CacheStatistics
getCacheStatistics cs =
case cs of
Expand Down Expand Up @@ -193,42 +206,54 @@ deleteStakeCache scred scache =

queryPoolKeyWithCache ::
MonadIO m =>
Maybe SlotDetails ->
CacheStatus ->
CacheAction ->
PoolKeyHash ->
ReaderT SqlBackend m (Either DB.LookupFail DB.PoolHashId)
queryPoolKeyWithCache cache cacheUA hsh =
case cache of
NoCache -> do
mPhId <- DB.queryPoolHashId (Generic.unKeyHashRaw hsh)
case mPhId of
Nothing -> pure $ Left (DB.DbLookupMessage "PoolKeyHash")
Just phId -> pure $ Right phId
ActiveCache ci -> do
mp <- liftIO $ readTVarIO (cPools ci)
case Map.lookup hsh mp of
Just phId -> do
liftIO $ hitPools (cStats ci)
-- hit so we can't cache even with 'CacheNew'
when (cacheUA == EvictAndUpdateCache) $
liftIO $
atomically $
modifyTVar (cPools ci) $
Map.delete hsh
pure $ Right phId
Nothing -> do
liftIO $ missPools (cStats ci)
queryPoolKeyWithCache mSlotDetails cache cacheUA hsh =
case mSlotDetails of
Nothing -> query
Just slotDetails ->
case getSyncStatusInHalfHour slotDetails of
SyncFollowing -> do
mPhId <- DB.queryPoolHashId (Generic.unKeyHashRaw hsh)
case mPhId of
Nothing -> pure $ Left (DB.DbLookupMessage "PoolKeyHash")
Just phId -> do
-- missed so we can't evict even with 'EvictAndReturn'
when (shouldCache cacheUA) $
liftIO $
atomically $
modifyTVar (cPools ci) $
Map.insert hsh phId
pure $ Right phId
Just phId -> pure $ Right phId
SyncLagging -> query
where
query = case cache of
NoCache -> do
mPhId <- DB.queryPoolHashId (Generic.unKeyHashRaw hsh)
case mPhId of
Nothing -> pure $ Left (DB.DbLookupMessage "PoolKeyHash")
Just phId -> pure $ Right phId
ActiveCache ci -> do
mp <- liftIO $ readTVarIO (cPools ci)
case Map.lookup hsh mp of
Just phId -> do
liftIO $ hitPools (cStats ci)
-- hit so we can't cache even with 'CacheNew'
when (cacheUA == EvictAndUpdateCache) $
liftIO $
atomically $
modifyTVar (cPools ci) $
Map.delete hsh
pure $ Right phId
Nothing -> do
liftIO $ missPools (cStats ci)
mPhId <- DB.queryPoolHashId (Generic.unKeyHashRaw hsh)
case mPhId of
Nothing -> pure $ Left (DB.DbLookupMessage "PoolKeyHash")
Just phId -> do
-- missed so we can't evict even with 'EvictAndReturn'
when (shouldCache cacheUA) $
liftIO $
atomically $
modifyTVar (cPools ci) $
Map.insert hsh phId
pure $ Right phId

insertPoolKeyWithCache ::
(MonadBaseControl IO m, MonadIO m) =>
Expand Down Expand Up @@ -274,13 +299,14 @@ queryPoolKeyOrInsert ::
(MonadBaseControl IO m, MonadIO m) =>
Text ->
Trace IO Text ->
Maybe SlotDetails ->
CacheStatus ->
CacheAction ->
Bool ->
PoolKeyHash ->
ReaderT SqlBackend m DB.PoolHashId
queryPoolKeyOrInsert txt trce cache cacheUA logsWarning hsh = do
pk <- queryPoolKeyWithCache cache cacheUA hsh
queryPoolKeyOrInsert txt trce slotDetails cache cacheUA logsWarning hsh = do
pk <- queryPoolKeyWithCache slotDetails cache cacheUA hsh
case pk of
Right poolHashId -> pure poolHashId
Left err -> do
Expand Down Expand Up @@ -329,24 +355,28 @@ queryMAWithCache cache policyId asset =

queryPrevBlockWithCache ::
MonadIO m =>
SlotDetails ->
Text ->
CacheStatus ->
ByteString ->
ExceptT SyncNodeError (ReaderT SqlBackend m) DB.BlockId
queryPrevBlockWithCache msg cache hsh =
case cache of
NoCache -> liftLookupFail msg $ DB.queryBlockId hsh
ActiveCache ci -> do
mCachedPrev <- liftIO $ readTVarIO (cPrevBlock ci)
case mCachedPrev of
-- if the cached block matches the requested hash, we return its db id.
Just (cachedBlockId, cachedHash) ->
if cachedHash == hsh
then do
liftIO $ hitPBlock (cStats ci)
pure cachedBlockId
else queryFromDb ci
Nothing -> queryFromDb ci
queryPrevBlockWithCache slotDetails msg cache hsh =
case getSyncStatusInHalfHour slotDetails of
SyncFollowing -> liftLookupFail msg $ DB.queryBlockId hsh
SyncLagging ->
case cache of
NoCache -> liftLookupFail msg $ DB.queryBlockId hsh
ActiveCache ci -> do
mCachedPrev <- liftIO $ readTVarIO (cPrevBlock ci)
case mCachedPrev of
-- if the cached block matches the requested hash, we return its db id.
Just (cachedBlockId, cachedHash) ->
if cachedHash == hsh
then do
liftIO $ hitPBlock (cStats ci)
pure cachedBlockId
else queryFromDb ci
Nothing -> queryFromDb ci
where
queryFromDb ::
MonadIO m =>
Expand All @@ -358,34 +388,38 @@ queryPrevBlockWithCache msg cache hsh =

queryTxIdWithCache ::
MonadIO m =>
SlotDetails ->
CacheStatus ->
Ledger.TxId StandardCrypto ->
ReaderT SqlBackend m (Either DB.LookupFail DB.TxId)
queryTxIdWithCache cache txIdLedger = do
case cache of
-- Direct database query if no cache.
NoCache -> DB.queryTxId txHash
ActiveCache cacheInternal -> do
-- Read current cache state.
cacheTx <- liftIO $ readTVarIO (cTxIds cacheInternal)

case FIFO.lookup txIdLedger cacheTx of
-- Cache hit, return the transaction ID.
Just txId -> do
liftIO $ hitTxIds (cStats cacheInternal)
pure $ Right txId
-- Cache miss.
Nothing -> do
eTxId <- DB.queryTxId txHash
liftIO $ missTxIds (cStats cacheInternal)
case eTxId of
Right txId -> do
-- Update cache.
liftIO $ atomically $ modifyTVar (cTxIds cacheInternal) $ FIFO.insert txIdLedger txId
-- Return ID after updating cache.
queryTxIdWithCache slotDetails cache txIdLedger = do
case getSyncStatusInHalfHour slotDetails of
SyncFollowing -> DB.queryTxId txHash
SyncLagging ->
case cache of
-- Direct database query if no cache.
NoCache -> DB.queryTxId txHash
ActiveCache cacheInternal -> do
-- Read current cache state.
cacheTx <- liftIO $ readTVarIO (cTxIds cacheInternal)

case FIFO.lookup txIdLedger cacheTx of
-- Cache hit, return the transaction ID.
Just txId -> do
liftIO $ hitTxIds (cStats cacheInternal)
pure $ Right txId
-- Return lookup failure.
Left _ -> pure $ Left $ DB.DbLookupTxHash txHash
-- Cache miss.
Nothing -> do
eTxId <- DB.queryTxId txHash
liftIO $ missTxIds (cStats cacheInternal)
case eTxId of
Right txId -> do
-- Update cache.
liftIO $ atomically $ modifyTVar (cTxIds cacheInternal) $ FIFO.insert txIdLedger txId
-- Return ID after updating cache.
pure $ Right txId
-- Return lookup failure.
Left _ -> pure $ Left $ DB.DbLookupTxHash txHash
where
txHash = Generic.unTxHash txIdLedger

Expand All @@ -403,18 +437,22 @@ tryUpdateCacheTx cache ledgerTxId txId = do

insertBlockAndCache ::
(MonadIO m, MonadBaseControl IO m) =>
SlotDetails ->
CacheStatus ->
DB.Block ->
ReaderT SqlBackend m DB.BlockId
insertBlockAndCache cache block =
insertBlockAndCache slotDetails cache block =
case cache of
NoCache -> DB.insertBlock block
ActiveCache ci -> do
bid <- DB.insertBlock block
liftIO $ do
missPrevBlock (cStats ci)
atomically $ writeTVar (cPrevBlock ci) $ Just (bid, DB.blockHash block)
pure bid
ActiveCache ci ->
case getSyncStatusInHalfHour slotDetails of
SyncFollowing -> DB.insertBlock block
SyncLagging -> do
bid <- DB.insertBlock block
liftIO $ do
missPrevBlock (cStats ci)
atomically $ writeTVar (cPrevBlock ci) $ Just (bid, DB.blockHash block)
pure bid

queryDatum ::
MonadIO m =>
Expand Down
Loading

0 comments on commit 8651d2e

Please sign in to comment.