Skip to content

Commit

Permalink
netsync: Re-request data sooner after peer disconnect.
Browse files Browse the repository at this point in the history
This change speeds up the relay of blocks and transactions when the
peer we have queried for this data disconnects.  If any other peers
have also announced this data, request the data again immediately from
them instead of waiting for a slower peer to announce the data again
through an inventory message.
  • Loading branch information
jrick authored Mar 8, 2023
1 parent 9a57b66 commit 1510b02
Showing 1 changed file with 52 additions and 7 deletions.
59 changes: 52 additions & 7 deletions internal/netsync/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -650,19 +650,64 @@ func (m *SyncManager) handleDonePeerMsg(p *peerpkg.Peer) {
// Remove the peer from the list of candidate peers.
delete(m.peers, p)

// Remove requested transactions from the global map so that they will
// be fetched from elsewhere.
// Re-request in-flight blocks and transactions that were not received
// by the disconnected peer if the data was announced by another peer.
// Remove the data from the manager's requested data maps if no other
// peers have announced the data.
requestQueues := make(map[*peerpkg.Peer][]*wire.InvVect)
var inv wire.InvVect
inv.Type = wire.InvTypeTx
TxHashes:
for txHash := range peer.requestedTxns {
inv.Hash = txHash
for pp, mgrPeer := range m.peers {
if !pp.IsKnownInventory(&inv) {
continue
}
inv := inv
invs := append(requestQueues[pp], &inv)
requestQueues[pp] = invs
mgrPeer.requestedTxns[txHash] = struct{}{}
continue TxHashes
}
// No peers found that have announced this data.
delete(m.requestedTxns, txHash)
}

// Remove requested blocks from the global map so that they will be
// fetched from elsewhere.
// TODO(oga) we could possibly here check which peers have these blocks
// and request them now to speed things up a little.
inv.Type = wire.InvTypeBlock
BlockHashes:
for blockHash := range peer.requestedBlocks {
inv.Hash = blockHash
for pp, mgrPeer := range m.peers {
if !pp.IsKnownInventory(&inv) {
continue
}
inv := inv
invs := append(requestQueues[pp], &inv)
requestQueues[pp] = invs
mgrPeer.requestedBlocks[blockHash] = struct{}{}
continue BlockHashes
}
// No peers found that have announced this data.
delete(m.requestedBlocks, blockHash)
}
for pp, requestQueue := range requestQueues {
var numRequested int32
gdmsg := wire.NewMsgGetData()
for _, iv := range requestQueue {
gdmsg.AddInvVect(iv)
numRequested++
if numRequested == wire.MaxInvPerMsg {
// Send full getdata message and reset.
pp.QueueMessage(gdmsg, nil)
gdmsg = wire.NewMsgGetData()
numRequested = 0
}
}

if len(gdmsg.InvList) > 0 {
pp.QueueMessage(gdmsg, nil)
}
}

// Attempt to find a new peer to sync from and reset the final requested
// block when the quitting peer is the sync peer.
Expand Down

0 comments on commit 1510b02

Please sign in to comment.