Skip to content

Commit

Permalink
netsync: Track best known blocks per peer.
Browse files Browse the repository at this point in the history
This reworks the net sync manager block announcement handling to keep
track of the best known block announced by each peer as determined by
having the most cumulative proof of work.

The primary motivation is to provide a relatively efficient mechanism to
discover which blocks are available to download from each peer to
eventually support downloading multiple blocks in parallel.

It also doubles to increase robustness of best height reporting of each
peer once the initial headers sync process is complete since the values
are only updated when the header has more cumulative proof of work
versus simply having a larger height since, although exceedingly rare in
practice, it is possible for a chain with fewer blocks to have more
cumulative work.
  • Loading branch information
davecgh committed Sep 11, 2024
1 parent f3d7099 commit 01dfc85
Showing 1 changed file with 158 additions and 57 deletions.
215 changes: 158 additions & 57 deletions internal/netsync/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,26 @@ type Peer struct {
// longer useful or are otherwise being malicious.
numConsecutiveOrphanHeaders int32

lastAnnouncedBlock *chainhash.Hash
// These fields are used to track the best known block announced by the peer
// which in turn provides a means to discover which blocks are available to
// download from the peer.
//
// announcedOrphanBlock is the hash of the most recently announced block
// that did not connect to any headers known to the local chain at the time
// of the announcement. It is tracked because such announcements are
// typically for newly found blocks whose parent headers will eventually
// become known and therefore have a fairly good chance of becoming the
// block with the most cumulative proof of work that the peer has announced.
//
// bestAnnouncedBlock is the hash of the block with the most cumulative
// proof of work that the peer has announced that is also known to the local
// chain.
//
// bestAnnouncedWork is the cumulative proof of work for the associated best
// announced block hash.
announcedOrphanBlock *chainhash.Hash
bestAnnouncedBlock *chainhash.Hash
bestAnnouncedWork *uint256.Uint256
}

// NewPeer returns a new instance of a peer that wraps the provided underlying
Expand Down Expand Up @@ -649,6 +668,21 @@ func (m *SyncManager) handlePeerConnectedMsg(ctx context.Context, peer *Peer) {

m.peers[peer] = struct{}{}

// Request headers starting from the parent of the best known header for the
// local chain immediately when the initial headers sync process is complete
// and the peer is a sync candidate.
//
// This primarily serves two purposes:
//
// 1) It immediately discovers any blocks that are not already known
// 2) It provides accurate discovery of the best known block of the peer
//
// Note that the parent is used because the request would otherwise result
// in an empty response when both the local and remote tips are the same.
if peer.syncCandidate && m.hdrSyncState.headersSynced {
m.fetchNextHeaders(peer)
}

// Start syncing by choosing the best candidate if needed.
if peer.syncCandidate && m.syncPeer == nil {
m.startSync()
Expand Down Expand Up @@ -891,6 +925,55 @@ func (m *SyncManager) maybeUpdateIsCurrent() {
}
}

// maybeUpdateBestAnnouncedBlock potentially updates the block with the most
// cumulative proof of work that the given peer has announced which includes its
// associated hash, cumulative work sum, and height.
//
// This function is NOT safe for concurrent access. It must be called from the
// event handler goroutine.
func (m *SyncManager) maybeUpdateBestAnnouncedBlock(p *Peer, hash *chainhash.Hash, header *wire.BlockHeader) {
chain := m.cfg.Chain
workSum, err := chain.ChainWork(hash)
if err != nil {
return
}

// Update the best block and associated values when the cumulative work for
// given block exceeds that of the current best known block for the peer.
if p.bestAnnouncedWork == nil || workSum.Gt(p.bestAnnouncedWork) {
p.bestAnnouncedBlock = hash
p.bestAnnouncedWork = &workSum
p.UpdateLastBlockHeight(int64(header.Height))
}
}

// maybeResolveOrphanBlock potentially resolves the most recently announced
// block by the peer that did not connect to any headers known to the local
// chain at the time of the announcement by checking if it is now known and,
// when it is, potentially making it the block with the most cumulative proof of
// work announced by the peer if needed.
//
// This function is NOT safe for concurrent access. It must be called from the
// event handler goroutine.
func (m *SyncManager) maybeResolveOrphanBlock(p *Peer) {
// Nothing to do if there isn't a pending orphan block announcement that has
// not yet been resolved or the block still isn't known.
chain := m.cfg.Chain
blockHash := p.announcedOrphanBlock
if blockHash == nil || !chain.HaveHeader(blockHash) {
return
}

// The block has now been resolved, so potentially make it the block with
// the most cumulative proof of work announced by the peer.
header, err := chain.HeaderByHash(blockHash)
if err != nil {
log.Warnf("Unable to retrieve known good header %s: %v", blockHash, err)
return
}
m.maybeUpdateBestAnnouncedBlock(p, blockHash, &header)
}

// processBlock processes the provided block using the internal chain instance.
//
// When no errors occurred during processing, the first return value indicates
Expand Down Expand Up @@ -1057,30 +1140,6 @@ func (m *SyncManager) handleBlockMsg(bmsg *blockMsg) {
m.cfg.MixPool.ExpireMessagesInBackground(header.Height)
}

// Update the latest block height for the peer to avoid stale heights when
// looking for future potential sync node candidacy.
//
// Also, when the chain is considered current and the block was accepted to
// the main chain, update the heights of other peers whose invs may have
// been ignored when actively syncing while the chain was not yet current or
// lost the lock announcement race.
blockHeight := int64(header.Height)
peer.UpdateLastBlockHeight(blockHeight)
if onMainChain && m.IsCurrent() {
for p := range m.peers {
// The height for the sending peer is already updated.
if p == peer {
continue
}

lastAnnBlock := p.lastAnnouncedBlock
if lastAnnBlock != nil && *lastAnnBlock == *blockHash {
p.UpdateLastBlockHeight(blockHeight)
p.lastAnnouncedBlock = nil
}
}
}

// Request more blocks using the headers when the request queue is getting
// short.
if peer == m.syncPeer && len(peer.requestedBlocks) < minInFlightBlocks {
Expand Down Expand Up @@ -1167,45 +1226,66 @@ func (m *SyncManager) handleHeadersMsg(hmsg *headersMsg) {
firstHeaderConnects := chain.HaveHeader(&firstHeader.PrevBlock)
headersSynced := m.hdrSyncState.headersSynced
if !firstHeaderConnects {
// Ignore headers that do not connect to any known headers when the
// initial headers sync is taking place. It is expected that headers
// will be announced that are not yet known.
if !headersSynced {
return
}

// Attempt to detect block announcements which do not connect to any
// known headers and request any headers starting from the best header
// the local chain knows in order to (hopefully) discover the missing
// headers.
// headers unless the initial headers sync process is still in progress.
//
// Meanwhile, also keep track of how many times the peer has
// consecutively sent a headers message that does not connect and
// disconnect it once the max allowed threshold has been reached.
// consecutively sent a headers message that looks like an announcement
// that does not connect and disconnect it once the max allowed
// threshold has been reached.
if numHeaders < maxExpectedHeaderAnnouncementsPerMsg {
peer.numConsecutiveOrphanHeaders++
if peer.numConsecutiveOrphanHeaders >= maxConsecutiveOrphanHeaders {
log.Debugf("Received %d consecutive headers messages that do "+
"not connect from peer %s -- disconnecting",
peer.numConsecutiveOrphanHeaders, peer)
peer.Disconnect()
return
}

log.Debugf("Requesting missing parents for header %s (height %d) "+
"received from peer %s", firstHeaderHash, firstHeader.Height,
peer)
bestHeaderHash, _ := chain.BestHeader()
blkLocator := chain.BlockLocatorFromHash(&bestHeaderHash)
locator := chainBlockLocatorToHashes(blkLocator)
peer.PushGetHeadersMsg(locator, &zeroHash)
if headersSynced {
log.Debugf("Requesting missing parents for header %s (height "+
"%d) received from peer %s", firstHeaderHash,
firstHeader.Height, peer)
bestHeaderHash, _ := chain.BestHeader()
blkLocator := chain.BlockLocatorFromHash(&bestHeaderHash)
locator := chainBlockLocatorToHashes(blkLocator)
peer.PushGetHeadersMsg(locator, &zeroHash)
}

// Track the final announced header as the most recently announced
// block by the peer that does not connect to any headers known to
// the local chain since there is a good chance it will eventually
// become known either from this peer or others.
m.maybeResolveOrphanBlock(peer)
finalHeader := headers[len(headers)-1]
finalHeaderHash := finalHeader.BlockHash()
peer.announcedOrphanBlock = &finalHeaderHash

// Update the latest block height for the peer to avoid stale
// heights when looking for future potential header sync node
// candidacy when the initial headers sync process is still in
// progess.
if !headersSynced {
peer.UpdateLastBlockHeight(int64(finalHeader.Height))
}
return
}

// The initial headers sync process is done and this does not appear to
// be a block announcement, so disconnect the peer.
log.Debugf("Received orphan header from peer %s -- disconnecting", peer)
peer.Disconnect()
// Disconnect the peer when the initial headers sync process is done and
// this does not appear to be a block announcement.
if headersSynced {
log.Debugf("Received orphan header from peer %s -- disconnecting",
peer)
peer.Disconnect()
return
}

// Ignore headers that do not connect to any known headers when the
// initial headers sync is taking place. It is expected that headers
// will be announced that are not yet known.
return
}

Expand Down Expand Up @@ -1273,12 +1353,13 @@ func (m *SyncManager) handleHeadersMsg(hmsg *headersMsg) {
// of the provided headers are successfully processed above.
peer.numConsecutiveOrphanHeaders = 0

// Update the last announced block to the final one in the announced headers
// above and update the height for the peer too.
// Potentially resolve a previously unknown announced block and then update
// the block with the most cumulative proof of work the peer has announced
// to the final announced header if needed.
finalHeader := headers[len(headers)-1]
finalReceivedHash := &headerHashes[len(headerHashes)-1]
peer.lastAnnouncedBlock = finalReceivedHash
peer.UpdateLastBlockHeight(int64(finalHeader.Height))
m.maybeResolveOrphanBlock(peer)
m.maybeUpdateBestAnnouncedBlock(peer, finalReceivedHash, finalHeader)

// Update the sync height if the new best known header height exceeds it.
syncHeight := m.SyncHeight()
Expand Down Expand Up @@ -1335,6 +1416,18 @@ func (m *SyncManager) handleHeadersMsg(hmsg *headersMsg) {
log.Info("Syncing chain")
m.progressLogger.SetLastLogTime(time.Now())

// Request headers starting from the parent of the best known header
// for the local chain from any sync candidates that have not yet
// had their best known block discovered now that the initial
// headers sync process is complete.
for peer := range m.peers {
m.maybeResolveOrphanBlock(peer)
if !peer.syncCandidate || peer.bestAnnouncedBlock != nil {
continue
}
m.fetchNextHeaders(peer)
}

// Potentially update whether the chain believes it is current now
// that the headers are synced.
chain.MaybeUpdateIsCurrent()
Expand Down Expand Up @@ -1534,15 +1627,23 @@ func (m *SyncManager) handleInvMsg(imsg *invMsg) {
}

if lastBlock != nil {
// Update the last announced block to the final one in the announced
// inventory above (if any). In the case the header for that block is
// already known, use that information to update the height for the peer
// too.
peer.lastAnnouncedBlock = &lastBlock.Hash
if isCurrent {
// Determine if the final announced block is already known to the local
// chain and then either track it as the most recently announced
// block by the peer that does not connect to any headers known to the
// local chain or potentially make it the block with the most cumulative
// proof of work announced by the peer when it is already known.
if !m.cfg.Chain.HaveHeader(&lastBlock.Hash) {
// Notice a copy of the hash is made here to avoid keeping a
// reference into the inventory vector which would prevent it from
// being GCd.
lastBlockHash := lastBlock.Hash
m.maybeResolveOrphanBlock(peer)
peer.announcedOrphanBlock = &lastBlockHash
} else {
header, err := m.cfg.Chain.HeaderByHash(&lastBlock.Hash)
if err == nil {
peer.UpdateLastBlockHeight(int64(header.Height))
m.maybeResolveOrphanBlock(peer)
m.maybeUpdateBestAnnouncedBlock(peer, &lastBlock.Hash, &header)
}
}
}
Expand Down

0 comments on commit 01dfc85

Please sign in to comment.