diff --git a/internal/netsync/manager.go b/internal/netsync/manager.go index a453672ac6..7f2d3db0d3 100644 --- a/internal/netsync/manager.go +++ b/internal/netsync/manager.go @@ -650,19 +650,64 @@ func (m *SyncManager) handleDonePeerMsg(p *peerpkg.Peer) { // Remove the peer from the list of candidate peers. delete(m.peers, p) - // Remove requested transactions from the global map so that they will - // be fetched from elsewhere. + // Re-request in-flight blocks and transactions that were not received + // by the disconnected peer if the data was announced by another peer. + // Remove the data from the manager's requested data maps if no other + // peers have announced the data. + requestQueues := make(map[*peerpkg.Peer][]*wire.InvVect) + var inv wire.InvVect + inv.Type = wire.InvTypeTx +TxHashes: for txHash := range peer.requestedTxns { + inv.Hash = txHash + for pp, mgrPeer := range m.peers { + if !pp.IsKnownInventory(&inv) { + continue + } + inv := inv + invs := append(requestQueues[pp], &inv) + requestQueues[pp] = invs + mgrPeer.requestedTxns[txHash] = struct{}{} + continue TxHashes + } + // No peers found that have announced this data. delete(m.requestedTxns, txHash) } - - // Remove requested blocks from the global map so that they will be - // fetched from elsewhere. - // TODO(oga) we could possibly here check which peers have these blocks - // and request them now to speed things up a little. + inv.Type = wire.InvTypeBlock +BlockHashes: for blockHash := range peer.requestedBlocks { + inv.Hash = blockHash + for pp, mgrPeer := range m.peers { + if !pp.IsKnownInventory(&inv) { + continue + } + inv := inv + invs := append(requestQueues[pp], &inv) + requestQueues[pp] = invs + mgrPeer.requestedBlocks[blockHash] = struct{}{} + continue BlockHashes + } + // No peers found that have announced this data. delete(m.requestedBlocks, blockHash) } + for pp, requestQueue := range requestQueues { + var numRequested int32 + gdmsg := wire.NewMsgGetData() + for _, iv := range requestQueue { + gdmsg.AddInvVect(iv) + numRequested++ + if numRequested == wire.MaxInvPerMsg { + // Send full getdata message and reset. + pp.QueueMessage(gdmsg, nil) + gdmsg = wire.NewMsgGetData() + numRequested = 0 + } + } + + if len(gdmsg.InvList) > 0 { + pp.QueueMessage(gdmsg, nil) + } + } // Attempt to find a new peer to sync from and reset the final requested // block when the quitting peer is the sync peer.