Skip to content

Commit

Permalink
Added key sorting and concurrent trimming to reduce overhead
Browse files Browse the repository at this point in the history
  • Loading branch information
jdowning100 committed Sep 9, 2024
1 parent a0aa063 commit 347cc9b
Show file tree
Hide file tree
Showing 7 changed files with 234 additions and 85 deletions.
132 changes: 106 additions & 26 deletions consensus/blake3pow/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"math/big"
"runtime"
"runtime/debug"
"sort"
"sync"
"time"

mapset "github.com/deckarep/golang-set"
Expand Down Expand Up @@ -677,16 +679,49 @@ func (blake3pow *Blake3pow) Finalize(chain consensus.ChainHeaderReader, batch et
}
}
start := time.Now()
collidingKeys, err := rawdb.ReadCollidingKeys(chain.Database(), header.ParentHash(nodeCtx))
if err != nil {
blake3pow.logger.Errorf("Failed to read colliding keys for block %s: %+v", header.ParentHash(nodeCtx).String(), err)
}
newCollidingKeys := make([][]byte, 0)
trimmedUtxos := make([]*types.SpentUtxoEntry, 0)
var wg sync.WaitGroup
var lock sync.Mutex
for denomination, depth := range trimDepths {
if header.NumberU64(nodeCtx) > depth+1 {
nextBlockToTrim := rawdb.ReadCanonicalHash(chain.Database(), header.NumberU64(nodeCtx)-depth)
TrimBlock(chain, batch, true, denomination, header.NumberU64(nodeCtx)-depth, nextBlockToTrim, &utxosDelete, &trimmedUtxos, &utxoSetSize, !setRoots, blake3pow.logger) // setRoots is false when we are processing the block
wg.Add(1)
go func(denomination uint8, depth uint64) {
nextBlockToTrim := rawdb.ReadCanonicalHash(chain.Database(), header.NumberU64(nodeCtx)-depth)
collisions := TrimBlock(chain, batch, denomination, true, header.NumberU64(nodeCtx)-depth, nextBlockToTrim, &utxosDelete, &trimmedUtxos, nil, &utxoSetSize, !setRoots, &lock, blake3pow.logger) // setRoots is false when we are processing the block
if len(collisions) > 0 {
lock.Lock()
newCollidingKeys = append(newCollidingKeys, collisions...)
lock.Unlock()
}
wg.Done()
}(denomination, depth)
}
}
if len(collidingKeys) > 0 {
wg.Add(1)
go func() {
// Trim colliding/duplicate keys here - an optimization could be to do this above in parallel with the other trims
collisions := TrimBlock(chain, batch, 0, false, 0, common.Hash{}, &utxosDelete, &trimmedUtxos, collidingKeys, &utxoSetSize, !setRoots, &lock, blake3pow.logger)
if len(collisions) > 0 {
lock.Lock()
newCollidingKeys = append(newCollidingKeys, collisions...)
lock.Unlock()
}
wg.Done()
}()
}
wg.Wait()
blake3pow.logger.Infof("Trimmed %d UTXOs from db in %s", len(trimmedUtxos), common.PrettyDuration(time.Since(start)))
if !setRoots {
rawdb.WriteTrimmedUTXOs(batch, header.Hash(), trimmedUtxos)
if len(newCollidingKeys) > 0 {
rawdb.WriteCollidingKeys(batch, header.Hash(), newCollidingKeys)
}
}
for _, hash := range utxosCreate {
multiSet.Add(hash.Bytes())
Expand Down Expand Up @@ -717,25 +752,72 @@ type UtxoEntryWithIndex struct {
Key []byte
}

func TrimBlock(chain consensus.ChainHeaderReader, batch ethdb.Batch, checkDenomination bool, denomination uint8, blockHeight uint64, blockHash common.Hash, utxosDelete *[]common.Hash, trimmedUtxos *[]*types.SpentUtxoEntry, utxoSetSize *uint64, deleteFromDb bool, logger *log.Logger) {
utxosCreated, _ := rawdb.ReadCreatedUTXOKeys(chain.Database(), blockHash)
if utxosCreated == nil {
// This is likely always going to be the case, as the prune depth will almost always be shorter than the trim depth
utxosCreated, _ = rawdb.ReadPrunedUTXOKeys(chain.Database(), blockHeight)
func TrimBlock(chain consensus.ChainHeaderReader, batch ethdb.Batch, denomination uint8, checkDenom bool, blockHeight uint64, blockHash common.Hash, utxosDelete *[]common.Hash, trimmedUtxos *[]*types.SpentUtxoEntry, collidingKeys [][]byte, utxoSetSize *uint64, deleteFromDb bool, lock *sync.Mutex, logger *log.Logger) [][]byte {
utxosCreated, _ := rawdb.ReadPrunedUTXOKeys(chain.Database(), blockHeight)
if len(utxosCreated) == 0 {
// This should almost never happen, but we need to handle it
utxosCreated, _ = rawdb.ReadCreatedUTXOKeys(chain.Database(), blockHash)
logger.Infof("Reading non-pruned UTXOs for block %d", blockHeight)
for i, key := range utxosCreated {
if len(key) == rawdb.UtxoKeyWithDenominationLength {
if key[len(key)-1] > types.MaxTrimDenomination {
// Don't keep it if the denomination is not trimmed
// The keys are sorted in order of denomination, so we can break here
break
}
key[rawdb.PrunedUtxoKeyWithDenominationLength+len(rawdb.UtxoPrefix)-1] = key[len(key)-1] // place the denomination at the end of the pruned key (11th byte will become 9th byte)
}
// Reduce key size to 9 bytes and cut off the prefix
key = key[len(rawdb.UtxoPrefix) : rawdb.PrunedUtxoKeyWithDenominationLength+len(rawdb.UtxoPrefix)]
utxosCreated[i] = key
}
}

logger.Infof("UTXOs created in block %d: %d", blockHeight, len(utxosCreated))
utxos := make(map[common.Hash][]*UtxoEntryWithIndex)
if len(collidingKeys) > 0 {
logger.Infof("Colliding keys: %d", len(collidingKeys))
utxosCreated = append(utxosCreated, collidingKeys...)
sort.Slice(utxosCreated, func(i, j int) bool {
return utxosCreated[i][len(utxosCreated[i])-1] < utxosCreated[j][len(utxosCreated[j])-1]
})
}
newCollisions := make([][]byte, 0)
duplicateKeys := make(map[[36]byte]bool) // cannot use rawdb.UtxoKeyLength for map as it's not const
// Start by grabbing all the UTXOs created in the block (that are still in the UTXO set)
for _, key := range utxosCreated {
if len(key) == 0 {
if len(key) != rawdb.PrunedUtxoKeyWithDenominationLength {
continue
}
if checkDenom {
if key[len(key)-1] != denomination {
if key[len(key)-1] > denomination {
break // The keys are stored in order of denomination, so we can stop checking here
} else {
continue
}
} else {
key = append(rawdb.UtxoPrefix, key...) // prepend the db prefix
key = key[:len(key)-1] // remove the denomination byte
}
}
// Check key in database
i := 0
it := chain.Database().NewIterator(key, nil)
for it.Next() {
data := it.Value()
if len(data) == 0 {
logger.Infof("Empty key found, denomination: %d", denomination)
continue
}
// Check if the key is a duplicate
if len(it.Key()) == rawdb.UtxoKeyLength {
key36 := [36]byte(it.Key())
if duplicateKeys[key36] {
continue
} else {
duplicateKeys[key36] = true
}
}
utxoProto := new(types.ProtoTxOut)
if err := proto.Unmarshal(data, utxoProto); err != nil {
logger.Errorf("Failed to unmarshal ProtoTxOut: %+v data: %+v key: %+v", err, data, key)
Expand All @@ -751,35 +833,33 @@ func TrimBlock(chain consensus.ChainHeaderReader, batch ethdb.Batch, checkDenomi
}).Error("Invalid utxo Proto")
continue
}
if checkDenomination && utxo.Denomination != denomination {
if checkDenom && utxo.Denomination != denomination {
continue
}
txHash, index, err := rawdb.ReverseUtxoKey(it.Key())
if err != nil {
logger.WithField("err", err).Error("Failed to parse utxo key")
continue
}
utxos[txHash] = append(utxos[txHash], &UtxoEntryWithIndex{utxo, index, it.Key()})
}
it.Release()
}

// Next, check if they are eligible for deletion and delete them
for txHash, utxoEntries := range utxos {
blockNumberForTx := rawdb.ReadTxLookupEntry(chain.Database(), txHash)
if blockNumberForTx != nil && *blockNumberForTx != blockHeight { // collision, wrong tx
logger.Infof("Collision: tx %s was created in block %d, but is in block %d", txHash.String(), *blockNumberForTx, blockHeight)
continue
}
for _, utxo := range utxoEntries {
*utxosDelete = append(*utxosDelete, types.UTXOHash(txHash, utxo.Index, utxo.UtxoEntry))
lock.Lock()
*utxosDelete = append(*utxosDelete, types.UTXOHash(txHash, index, utxo))
if deleteFromDb {
batch.Delete(utxo.Key)
*trimmedUtxos = append(*trimmedUtxos, &types.SpentUtxoEntry{OutPoint: types.OutPoint{txHash, utxo.Index}, UtxoEntry: utxo.UtxoEntry})
batch.Delete(it.Key())
*trimmedUtxos = append(*trimmedUtxos, &types.SpentUtxoEntry{OutPoint: types.OutPoint{txHash, index}, UtxoEntry: utxo})
}
*utxoSetSize--
lock.Unlock()
i++
if i >= types.MaxTrimCollisionsPerKeyPerBlock {
// This will rarely ever happen, but if it does, we should continue trimming this key in the next block
logger.WithField("blockHeight", blockHeight).Error("MaxTrimCollisionsPerBlock exceeded")
newCollisions = append(newCollisions, key)
break
}
}
it.Release()
}
return newCollisions
}

func UpdateTrimDepths(trimDepths map[uint8]uint64, utxoSetSize uint64) bool {
Expand Down
77 changes: 38 additions & 39 deletions core/chain_indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,9 @@ type ChainIndexer struct {

throttling time.Duration // Disk throttling to prevent a heavy upgrade from hogging resources

logger *log.Logger
lock sync.Mutex

logger *log.Logger
lock sync.Mutex
pruneLock sync.Mutex
indexAddressUtxos bool
utxoKeyPrunerChan chan []*types.SpentUtxoEntry
}
Expand Down Expand Up @@ -325,37 +325,39 @@ func (c *ChainIndexer) indexerLoop(currentHeader *types.WorkObject, qiIndexerCh
}

func (c *ChainIndexer) PruneOldBlockData(blockHeight uint64) {
c.pruneLock.Lock()
blockHash := rawdb.ReadCanonicalHash(c.chainDb, blockHeight)
if rawdb.ReadAlreadyPruned(c.chainDb, blockHash) {
return
}
rawdb.WriteAlreadyPruned(c.chainDb, blockHash) // Pruning can only happen once per block
c.pruneLock.Unlock()

rawdb.DeleteInboundEtxs(c.chainDb, blockHash)
rawdb.DeletePendingEtxs(c.chainDb, blockHash)
rawdb.DeletePendingEtxsRollup(c.chainDb, blockHash)
rawdb.DeleteManifest(c.chainDb, blockHash)
rawdb.DeletePbCacheBody(c.chainDb, blockHash)
rawdb.DeletePendingHeader(c.chainDb, blockHash)
createdUtxos, _ := rawdb.ReadCreatedUTXOKeys(c.chainDb, blockHash)
createdUtxosToKeep := make([][]byte, 0, len(createdUtxos))
for _, key := range createdUtxos {
/*data, _ := c.chainDb.Get(key)
if len(data) == 0 {
// Don't keep it if it doesn't exist
continue
}
utxoProto := new(types.ProtoTxOut)
if err := proto.Unmarshal(data, utxoProto); err != nil {
// Don't keep it if it can't be unmarshaled
continue
if len(createdUtxos) > 0 {
createdUtxosToKeep := make([][]byte, 0, len(createdUtxos)/2)
for _, key := range createdUtxos {
if len(key) == rawdb.UtxoKeyWithDenominationLength {
if key[len(key)-1] > types.MaxTrimDenomination {
// Don't keep it if the denomination is not trimmed
// The keys are sorted in order of denomination, so we can break here
break
}
key[rawdb.PrunedUtxoKeyWithDenominationLength+len(rawdb.UtxoPrefix)-1] = key[len(key)-1] // place the denomination at the end of the pruned key (11th byte will become 9th byte)
}
// Reduce key size to 9 bytes and cut off the prefix
key = key[len(rawdb.UtxoPrefix) : rawdb.PrunedUtxoKeyWithDenominationLength+len(rawdb.UtxoPrefix)]
createdUtxosToKeep = append(createdUtxosToKeep, key)
}
utxo := new(types.UtxoEntry)
if err := utxo.ProtoDecode(utxoProto); err != nil {
// Don't keep it if it can't be decoded into UtxoEntry
continue
}*/
// Reduce key size to 8 bytes
key = key[:8]
createdUtxosToKeep = append(createdUtxosToKeep, key)
c.logger.Infof("Removed %d utxo keys from block %d", len(createdUtxos)-len(createdUtxosToKeep), blockHeight)
rawdb.WritePrunedUTXOKeys(c.chainDb, blockHeight, createdUtxosToKeep)
}
rawdb.WritePrunedUTXOKeys(c.chainDb, blockHeight, createdUtxosToKeep)
rawdb.DeleteCreatedUTXOKeys(c.chainDb, blockHash)
/*tutxos, _ := rawdb.ReadTrimmedUTXOs(c.chainDb, blockHash)
sutxos, err := rawdb.ReadSpentUTXOs(c.chainDb, blockHash)
Expand All @@ -374,6 +376,7 @@ func (c *ChainIndexer) PruneOldBlockData(blockHeight uint64) {
rawdb.DeleteSpentUTXOs(c.chainDb, blockHash)
rawdb.DeleteTrimmedUTXOs(c.chainDb, blockHash)
rawdb.DeleteTrimDepths(c.chainDb, blockHash)
rawdb.DeleteCollidingKeys(c.chainDb, blockHash)
}

func (c *ChainIndexer) UTXOKeyPruner() {
Expand All @@ -384,45 +387,41 @@ func (c *ChainIndexer) UTXOKeyPruner() {
errc <- nil
return
case spentUtxos := <-c.utxoKeyPrunerChan:
blockHeights := make(map[uint64]uint64)
pruned := 0
start := time.Now()
for _, spentUtxo := range spentUtxos {
blockheight := rawdb.ReadTxLookupEntry(c.chainDb, spentUtxo.TxHash)
if blockheight == nil {
continue
}
utxoKeys, err := rawdb.ReadPrunedUTXOKeys(c.chainDb, *blockheight)
if err != nil || utxoKeys == nil {
currentHeight := rawdb.ReadHeaderNumber(c.chainDb, rawdb.ReadHeadHeaderHash(c.chainDb))
if currentHeight == nil {
currentHeight = new(uint64)
}
c.logger.Errorf("Failed to read pruned utxo keys: height %d currentHeight %d err %+v", *blockheight, *currentHeight, err)
continue
}
key := rawdb.UtxoKey(spentUtxo.TxHash, spentUtxo.Index)
for i := 0; i < len(utxoKeys); i++ {
if compareMinLength(utxoKeys[i], key) {
if spentUtxo.Denomination == utxoKeys[i][len(utxoKeys[i])-1] && comparePrunedKeyWithTxHash(utxoKeys[i], spentUtxo.TxHash[:]) {
// Remove the key by shifting the slice to the left
utxoKeys = append(utxoKeys[:i], utxoKeys[i+1:]...)
blockHeights[*blockheight]++
pruned++
break
} else {
i++ // Only increment i if no element was removed
}
}
rawdb.WritePrunedUTXOKeys(c.chainDb, *blockheight, utxoKeys)

}
c.logger.Infof("Pruned %d utxo keys out of %d in %s, pruned heights: %+v", pruned, len(spentUtxos), common.PrettyDuration(time.Since(start)), blockHeights)
}
}
}

func compareMinLength(a, b []byte) bool {
minLen := len(a)
if len(b) < minLen {
minLen = len(b)
}
func comparePrunedKeyWithTxHash(a, b []byte) bool {

// Compare the slices up to the length of the shorter slice
for i := 0; i < minLen; i++ {
// Compare the slices up to the length of the pruned key
// The 9th byte (position 8) is the denomination in the pruned utxo key
for i := 0; i < rawdb.PrunedUtxoKeyWithDenominationLength-1; i++ {
if a[i] != b[i] {
return false
}
Expand Down
7 changes: 5 additions & 2 deletions core/headerchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -376,8 +376,8 @@ func (hc *HeaderChain) SetCurrentHeader(head *types.WorkObject) error {
return nil
}

//Find a common header
commonHeader := hc.findCommonAncestor(head)
//Find a common header between the current header and the new head
commonHeader := rawdb.FindCommonAncestor(hc.headerDb, prevHeader, head, nodeCtx)
newHeader := types.CopyWorkObject(head)

// Delete each header and rollback state processor until common header
Expand Down Expand Up @@ -423,6 +423,9 @@ func (hc *HeaderChain) SetCurrentHeader(head *types.WorkObject) error {
return err
}
for _, key := range utxoKeys {
if len(key) == rawdb.UtxoKeyWithDenominationLength {
key = key[:rawdb.UtxoKeyLength] // The last byte of the key is the denomination (but only in CreatedUTXOKeys)
}
hc.headerDb.Delete(key)
}
}
Expand Down
Loading

0 comments on commit 347cc9b

Please sign in to comment.