Skip to content

Commit

Permalink
peer,main: propagate p2p mixing messages
Browse files Browse the repository at this point in the history
This commit adds the mixing message pool to the server and listens for all
peer-to-peer mixing messages broadcast on the network.  If these messages are
able to be accepted to the mixpool, they are relayed to other peers through
the inventory system, and notified to RPC clients.

At startup, all current pair request messages are requested from peers, but
the entire mixing pools are not synchronized.
  • Loading branch information
jrick committed Nov 8, 2023
1 parent 387b833 commit a379d96
Show file tree
Hide file tree
Showing 17 changed files with 703 additions and 48 deletions.
8 changes: 6 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ require (
github.com/decred/dcrd/gcs/v4 v4.0.0
github.com/decred/dcrd/lru v1.1.2
github.com/decred/dcrd/math/uint256 v1.0.1
github.com/decred/dcrd/mixing v0.0.0
github.com/decred/dcrd/peer/v3 v3.0.2
github.com/decred/dcrd/rpc/jsonrpc/types/v4 v4.0.0
github.com/decred/dcrd/rpcclient/v8 v8.0.0
Expand All @@ -38,18 +39,20 @@ require (
github.com/jrick/bitset v1.0.0
github.com/jrick/logrotate v1.0.0
github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7
golang.org/x/sys v0.8.0
golang.org/x/term v0.5.0
golang.org/x/sys v0.13.0
golang.org/x/term v0.13.0
lukechampine.com/blake3 v1.2.1
)

require (
github.com/agl/ed25519 v0.0.0-20170116200512-5312a6153412 // indirect
github.com/companyzero/sntrup4591761 v0.0.0-20220309191932-9e0f3af2f07a // indirect
github.com/dchest/siphash v1.2.3 // indirect
github.com/decred/dcrd/dcrec/edwards/v2 v2.0.3 // indirect
github.com/decred/dcrd/hdkeychain/v3 v3.1.1 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/klauspost/cpuid/v2 v2.0.9 // indirect
golang.org/x/crypto v0.7.0 // indirect
)

replace (
Expand All @@ -75,6 +78,7 @@ replace (
github.com/decred/dcrd/limits => ./limits
github.com/decred/dcrd/lru => ./lru
github.com/decred/dcrd/math/uint256 => ./math/uint256
github.com/decred/dcrd/mixing => ./mixing
github.com/decred/dcrd/peer/v3 => ./peer
github.com/decred/dcrd/rpc/jsonrpc/types/v4 => ./rpc/jsonrpc/types
github.com/decred/dcrd/rpcclient/v8 => ./rpcclient
Expand Down
18 changes: 12 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
decred.org/cspp/v2 v2.0.1-0.20230307024253-8a22691aa376 h1:739v8a7LMXuCTFNodcKYVpfj70CKWvJeE3NKFDn/65I=
decred.org/cspp/v2 v2.0.1-0.20230307024253-8a22691aa376/go.mod h1:+/9jr1RhVshWnc0U/eXxMlxfiu9/f7ia6TTyS0Oh5n0=
github.com/agl/ed25519 v0.0.0-20170116200512-5312a6153412 h1:w1UutsfOrms1J05zt7ISrnJIXKzwaspym5BTKGx93EI=
github.com/agl/ed25519 v0.0.0-20170116200512-5312a6153412/go.mod h1:WPjqKcmVOxf0XSf3YxCJs6N6AOSrOx3obionmG7T0y0=
github.com/companyzero/sntrup4591761 v0.0.0-20220309191932-9e0f3af2f07a h1:clYxJ3Os0EQUKDDVU8M0oipllX0EkuFNBfhVQuIfyF0=
github.com/companyzero/sntrup4591761 v0.0.0-20220309191932-9e0f3af2f07a/go.mod h1:z/9Ck1EDixEbBbZ2KH2qNHekEmDLTOZ+FyoIPWWSVOI=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dchest/siphash v1.2.3 h1:QXwFc8cFOR2dSa/gE6o/HokBMWtLUaNDVd+22aKHeEA=
Expand Down Expand Up @@ -53,11 +57,13 @@ github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7 h1:epCh84lMvA70
github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7/go.mod h1:q4W45IWZaF22tdD+VEXcAWRA037jwmWEB5VWYORlTpc=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.7.0 h1:AvwMYaRytfdeVt3u6mLaxYtErKYjxA2OXjJ1HHq6t3A=
golang.org/x/crypto v0.7.0/go.mod h1:pYwdfH91IfpZVANVyUOhSIPZaFoJGxTFbZhFTx+dXZU=
golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
golang.org/x/net v0.0.0-20200813134508-3edf25e44fcc h1:zK/HqS5bZxDptfPJNq8v7vJfXtkU7r9TLIoSr1bXaP4=
golang.org/x/net v0.0.0-20200813134508-3edf25e44fcc/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA=
golang.org/x/net v0.8.0 h1:Zrh2ngAOFYneWTAIAPethzeaQLuHwhuBkuV6ZiRnUaQ=
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
Expand All @@ -69,14 +75,14 @@ golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20200519105757-fe76b779f299/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200814200057-3d37ad5750ed/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210320140829-1e4c9ba3b0c4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.8.0 h1:EBmGv8NaZBZTWvrbjNoL6HVt+IVy3QDQpJs7VRIw3tU=
golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.5.0 h1:n2a8QNdAb0sZNpU9R1ALUXBbY+w51fCQDN+7EdxNBsY=
golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k=
golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE=
golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.13.0 h1:bb+I9cTfFazGW51MZqBVmZy7+JEJMouUHTUSKVQLBek=
golang.org/x/term v0.13.0/go.mod h1:LTmsnFJwVN6bCy1rVCoS+qHT1HhALEFxKncY3WNNh4U=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
golang.org/x/text v0.3.3 h1:cokOdA+Jmi5PJGXLlLllQSgYigAEfHXJAERHVMaCc2k=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.8.0 h1:57P1ETyNKtuIjB4SRd15iJxuhj8Gc416Y78H3qgMh68=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE=
Expand Down
5 changes: 5 additions & 0 deletions internal/blockchain/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,11 @@ type BlockChain struct {
bulkImportMode bool
}

// ChainParams returns the chain parameters.
func (b *BlockChain) ChainParams() *chaincfg.Params {
return b.chainParams
}

const (
// stakeMajorityCacheKeySize is comprised of the stake version and the
// hash size. The stake version is a little endian uint32, hence we
Expand Down
5 changes: 5 additions & 0 deletions internal/netsync/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package netsync

import (
"github.com/decred/dcrd/dcrutil/v4"
"github.com/decred/dcrd/mixing"
)

// PeerNotifier provides an interface to notify peers of status changes related
Expand All @@ -14,4 +15,8 @@ type PeerNotifier interface {
// AnnounceNewTransactions generates and relays inventory vectors and
// notifies websocket clients of the passed transactions.
AnnounceNewTransactions(txns []*dcrutil.Tx)

// AnnounceMixMessage generates and relays inventory vectors of the
// passed messages.
AnnounceMixMessages(msgs []mixing.Message)
}
173 changes: 147 additions & 26 deletions internal/netsync/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (
"github.com/decred/dcrd/internal/mempool"
"github.com/decred/dcrd/internal/progresslog"
"github.com/decred/dcrd/math/uint256"
"github.com/decred/dcrd/mixing"
"github.com/decred/dcrd/mixing/mixpool"
peerpkg "github.com/decred/dcrd/peer/v3"
"github.com/decred/dcrd/wire"
)
Expand Down Expand Up @@ -64,6 +66,10 @@ const (
maxRejectedTxns = 62500
rejectedTxnsFPRate = 0.0000001

// XXX these numbers were copied from rejected txns
maxRejectedMixMsgs = 625000
rejectedMixMsgsFPRate = 0.0000001

// maxRequestedBlocks is the maximum number of requested block
// hashes to store in memory.
maxRequestedBlocks = wire.MaxInvPerMsg
Expand All @@ -72,6 +78,10 @@ const (
// hashes to store in memory.
maxRequestedTxns = wire.MaxInvPerMsg

// maxRequestedMixMsgs is the maximum number of hashes of in-flight
// mixing messages.
maxRequestedMixMsgs = wire.MaxInvPerMsg

// maxExpectedHeaderAnnouncementsPerMsg is the maximum number of headers in
// a single message that is expected when determining when the message
// appears to be announcing new blocks.
Expand Down Expand Up @@ -179,15 +189,24 @@ type processBlockMsg struct {
reply chan processBlockResponse
}

// mixMsg is a message type to be sent across the message channel for requesting
// a message's acceptence to the mixing pool.
type mixMsg struct {
msg mixing.Message
peer *Peer
reply chan struct{}
}

// Peer extends a common peer to maintain additional state needed by the sync
// manager. The internals are intentionally unexported to create an opaque
// type.
type Peer struct {
*peerpkg.Peer

syncCandidate bool
requestedTxns map[chainhash.Hash]struct{}
requestedBlocks map[chainhash.Hash]struct{}
syncCandidate bool
requestedTxns map[chainhash.Hash]struct{}
requestedBlocks map[chainhash.Hash]struct{}
requestedMixMsgs map[chainhash.Hash]struct{}

// initialStateRequested tracks whether or not the initial state data has
// been requested from the peer.
Expand All @@ -207,10 +226,11 @@ type Peer struct {
func NewPeer(peer *peerpkg.Peer) *Peer {
isSyncCandidate := peer.Services()&wire.SFNodeNetwork == wire.SFNodeNetwork
return &Peer{
Peer: peer,
syncCandidate: isSyncCandidate,
requestedTxns: make(map[chainhash.Hash]struct{}),
requestedBlocks: make(map[chainhash.Hash]struct{}),
Peer: peer,
syncCandidate: isSyncCandidate,
requestedTxns: make(map[chainhash.Hash]struct{}),
requestedBlocks: make(map[chainhash.Hash]struct{}),
requestedMixMsgs: make(map[chainhash.Hash]struct{}),
}
}

Expand Down Expand Up @@ -291,13 +311,15 @@ type SyncManager struct {
// time.
minKnownWork *uint256.Uint256

rejectedTxns *apbf.Filter
requestedTxns map[chainhash.Hash]struct{}
requestedBlocks map[chainhash.Hash]struct{}
progressLogger *progresslog.Logger
syncPeer *Peer
msgChan chan interface{}
peers map[*Peer]struct{}
rejectedTxns *apbf.Filter
rejectedMixMsgs *apbf.Filter
requestedTxns map[chainhash.Hash]struct{}
requestedBlocks map[chainhash.Hash]struct{}
requestedMixMsgs map[chainhash.Hash]struct{}
progressLogger *progresslog.Logger
syncPeer *Peer
msgChan chan interface{}
peers map[*Peer]struct{}

// hdrSyncState houses the state used to track the initial header sync
// process and related stall handling.
Expand Down Expand Up @@ -567,6 +589,9 @@ func maybeRequestInitialState(peer *Peer) {
err := m.AddTypes(wire.InitStateHeadBlocks,
wire.InitStateHeadBlockVotes,
wire.InitStateTSpends)
if err == nil && peer.ProtocolVersion() >= wire.MixVersion {
err = m.AddType(wire.InitStateMixPRs)
}
if err != nil {
log.Errorf("Unexpected error building getinitstate msg: %v", err)
return
Expand Down Expand Up @@ -666,6 +691,22 @@ BlockHashes:
// No peers found that have announced this data.
delete(m.requestedBlocks, blockHash)
}
inv.Type = wire.InvTypeMix
MixHashes:
for mixHash := range peer.requestedMixMsgs {
inv.Hash = mixHash
for pp := range m.peers {
if !pp.IsKnownInventory(&inv) {
continue
}
invs := append(requestQueues[pp], inv)
requestQueues[pp] = invs
pp.requestedMixMsgs[mixHash] = struct{}{}
continue MixHashes
}
// No peers found that have announced this data.
delete(m.requestedMixMsgs, mixHash)
}
for pp, requestQueue := range requestQueues {
var numRequested int32
gdmsg := wire.NewMsgGetData()
Expand Down Expand Up @@ -754,6 +795,28 @@ func (m *SyncManager) handleTxMsg(tmsg *txMsg) {
m.cfg.PeerNotifier.AnnounceNewTransactions(acceptedTxs)
}

// handleMixMsg handles mixing messages from all peers.
func (m *SyncManager) handleMixMsg(mmsg *mixMsg) {
peer := mmsg.peer

mixHash := mmsg.msg.Hash()

accepted, err := m.cfg.MixPool.AcceptMessage(mmsg.msg)
if err != nil {
log.Errorf("Failed to process %T mixing message %v: %v",
mmsg.msg, &mixHash, err)
return
}
if accepted == nil {
return
}

delete(peer.requestedMixMsgs, mixHash)
delete(m.requestedMixMsgs, mixHash)

m.cfg.PeerNotifier.AnnounceMixMessages([]mixing.Message{accepted})
}

// maybeUpdateIsCurrent potentially updates the manager to signal it believes
// the chain is considered synced.
//
Expand Down Expand Up @@ -1275,6 +1338,19 @@ func (m *SyncManager) needTx(hash *chainhash.Hash) bool {
return true
}

// needMixMsg returns whether or not the mixing message needs to be downloaded.
func (m *SyncManager) needMixMsg(hash *chainhash.Hash) bool {
if m.rejectedMixMsgs.Contains(hash[:]) {
return false
}

if m.cfg.MixPool.HaveMessage(hash) {
return false
}

return true
}

// handleInvMsg handles inv messages from all peers. This entails examining the
// inventory advertised by the remote peer for block and transaction
// announcements and acting accordingly.
Expand Down Expand Up @@ -1332,6 +1408,29 @@ func (m *SyncManager) handleInvMsg(imsg *invMsg) {
limitAdd(peer.requestedTxns, iv.Hash, maxRequestedTxns)
requestQueue = append(requestQueue, iv)
}

case wire.InvTypeMix:
// Add the mix message to the cache of known inventory
// for the peer. This helps avoid sending mix messages
// to the peer that it is already known to have.
peer.AddKnownInventory(iv)

// Ignore mixing messages before the chain is current or
// if the messages are not needed. Pair request (PR)
// messages reference unspent outputs that must be
// checked to exist and be unspent before they are
// accepted, and all later messages must reference an
// existing PR recorded in the mixing pool.
if !isCurrent || !m.needMixMsg(&iv.Hash) {
continue
}

// Request the mixing message if it is not already pending.
if _, exists := m.requestedMixMsgs[iv.Hash]; !exists {
limitAdd(m.requestedMixMsgs, iv.Hash, maxRequestedMixMsgs)
limitAdd(peer.requestedMixMsgs, iv.Hash, maxRequestedMixMsgs)
requestQueue = append(requestQueue, iv)
}
}
}

Expand Down Expand Up @@ -1420,6 +1519,13 @@ out:
case <-ctx.Done():
}

case *mixMsg:
m.handleMixMsg(msg)
select {
case msg.reply <- struct{}{}:
case <-ctx.Done():
}

case *invMsg:
m.handleInvMsg(msg)

Expand Down Expand Up @@ -1538,6 +1644,15 @@ func (m *SyncManager) QueueHeaders(headers *wire.MsgHeaders, peer *Peer) {
}
}

// QueueMixMsg adds the passed mixing message and peer to the event handling
// queue.
func (m *SyncManager) QueueMixMsg(msg mixing.Message, peer *Peer, done chan struct{}) {
select {
case m.msgChan <- &mixMsg{msg: msg, peer: peer, reply: done}:
case <-m.quit:
}
}

// QueueNotFound adds the passed notfound message and peer to the event handling
// queue.
func (m *SyncManager) QueueNotFound(notFound *wire.MsgNotFound, peer *Peer) {
Expand Down Expand Up @@ -1803,6 +1918,10 @@ type Config struct {
// and querying the most recently confirmed transactions. It is useful for
// preventing duplicate requests.
RecentlyConfirmedTxns *apbf.Filter

// MixPool specifies the mixing pool to use for transient mixing
// messages broadcast across the network.
MixPool *mixpool.Pool
}

// New returns a new network chain synchronization manager. Use Run to begin
Expand All @@ -1819,17 +1938,19 @@ func New(config *Config) *SyncManager {
}

return &SyncManager{
cfg: *config,
rejectedTxns: apbf.NewFilter(maxRejectedTxns, rejectedTxnsFPRate),
requestedTxns: make(map[chainhash.Hash]struct{}),
requestedBlocks: make(map[chainhash.Hash]struct{}),
peers: make(map[*Peer]struct{}),
minKnownWork: minKnownWork,
hdrSyncState: makeHeaderSyncState(),
progressLogger: progresslog.New("Processed", log),
msgChan: make(chan interface{}, config.MaxPeers*3),
quit: make(chan struct{}),
syncHeight: config.Chain.BestSnapshot().Height,
isCurrent: config.Chain.IsCurrent(),
cfg: *config,
rejectedTxns: apbf.NewFilter(maxRejectedTxns, rejectedTxnsFPRate),
rejectedMixMsgs: apbf.NewFilter(maxRejectedMixMsgs, rejectedMixMsgsFPRate),
requestedTxns: make(map[chainhash.Hash]struct{}),
requestedBlocks: make(map[chainhash.Hash]struct{}),
requestedMixMsgs: make(map[chainhash.Hash]struct{}),
peers: make(map[*Peer]struct{}),
minKnownWork: minKnownWork,
hdrSyncState: makeHeaderSyncState(),
progressLogger: progresslog.New("Processed", log),
msgChan: make(chan interface{}, config.MaxPeers*3),
quit: make(chan struct{}),
syncHeight: config.Chain.BestSnapshot().Height,
isCurrent: config.Chain.IsCurrent(),
}
}
Loading

0 comments on commit a379d96

Please sign in to comment.