From 1510b02e602808dc0c8f4acec0bb169869c03cc0 Mon Sep 17 00:00:00 2001 From: Josh Rickmar Date: Wed, 8 Mar 2023 10:07:32 -0500 Subject: [PATCH] netsync: Re-request data sooner after peer disconnect. This change speeds up the relay of blocks and transactions when the peer we have queried for this data disconnects. If any other peers have also announced this data, request the data again immediately from them instead of waiting for a slower peer to announce the data again through an inventory message. --- internal/netsync/manager.go | 59 ++++++++++++++++++++++++++++++++----- 1 file changed, 52 insertions(+), 7 deletions(-) diff --git a/internal/netsync/manager.go b/internal/netsync/manager.go index a453672ac6..7f2d3db0d3 100644 --- a/internal/netsync/manager.go +++ b/internal/netsync/manager.go @@ -650,19 +650,64 @@ func (m *SyncManager) handleDonePeerMsg(p *peerpkg.Peer) { // Remove the peer from the list of candidate peers. delete(m.peers, p) - // Remove requested transactions from the global map so that they will - // be fetched from elsewhere. + // Re-request in-flight blocks and transactions that were not received + // by the disconnected peer if the data was announced by another peer. + // Remove the data from the manager's requested data maps if no other + // peers have announced the data. + requestQueues := make(map[*peerpkg.Peer][]*wire.InvVect) + var inv wire.InvVect + inv.Type = wire.InvTypeTx +TxHashes: for txHash := range peer.requestedTxns { + inv.Hash = txHash + for pp, mgrPeer := range m.peers { + if !pp.IsKnownInventory(&inv) { + continue + } + inv := inv + invs := append(requestQueues[pp], &inv) + requestQueues[pp] = invs + mgrPeer.requestedTxns[txHash] = struct{}{} + continue TxHashes + } + // No peers found that have announced this data. delete(m.requestedTxns, txHash) } - - // Remove requested blocks from the global map so that they will be - // fetched from elsewhere. - // TODO(oga) we could possibly here check which peers have these blocks - // and request them now to speed things up a little. + inv.Type = wire.InvTypeBlock +BlockHashes: for blockHash := range peer.requestedBlocks { + inv.Hash = blockHash + for pp, mgrPeer := range m.peers { + if !pp.IsKnownInventory(&inv) { + continue + } + inv := inv + invs := append(requestQueues[pp], &inv) + requestQueues[pp] = invs + mgrPeer.requestedBlocks[blockHash] = struct{}{} + continue BlockHashes + } + // No peers found that have announced this data. delete(m.requestedBlocks, blockHash) } + for pp, requestQueue := range requestQueues { + var numRequested int32 + gdmsg := wire.NewMsgGetData() + for _, iv := range requestQueue { + gdmsg.AddInvVect(iv) + numRequested++ + if numRequested == wire.MaxInvPerMsg { + // Send full getdata message and reset. + pp.QueueMessage(gdmsg, nil) + gdmsg = wire.NewMsgGetData() + numRequested = 0 + } + } + + if len(gdmsg.InvList) > 0 { + pp.QueueMessage(gdmsg, nil) + } + } // Attempt to find a new peer to sync from and reset the final requested // block when the quitting peer is the sync peer.