From f9ffa9e3358ccb2001859a5605f77b6ddaedf4b2 Mon Sep 17 00:00:00 2001 From: Dave Collins Date: Wed, 13 Sep 2023 14:45:03 -0500 Subject: [PATCH] pool: Keep hub waitgroup local. The waitgroup for the hub is currently being passed down into all various configuration structs for each of the goroutines it runs which requires a lot of plumbing and is quite error prone. This addresses that by modifying the logic to keep the hub waitgroup local to the run method and removes the aforementioned unnecessary plumbing. These changes have also exposed an issue with endpoint.run in that it never terminates, but this commit does not fix that issue. --- pool/chainstate.go | 3 --- pool/chainstate_test.go | 11 +++++++---- pool/endpoint.go | 3 --- pool/endpoint_test.go | 13 +++++++++---- pool/hub.go | 21 +++++++++++++-------- pool/paymentmgr.go | 4 +--- pool/paymentmgr_test.go | 11 +++++++---- 7 files changed, 37 insertions(+), 29 deletions(-) diff --git a/pool/chainstate.go b/pool/chainstate.go index 03ce5da4..6ab9630d 100644 --- a/pool/chainstate.go +++ b/pool/chainstate.go @@ -45,8 +45,6 @@ type ChainStateConfig struct { Cancel context.CancelFunc // SignalCache sends the provided cache update event to the gui cache. SignalCache func(event CacheUpdateEvent) - // HubWg represents the hub's waitgroup. - HubWg *sync.WaitGroup } // blockNotification wraps a block header notification and a done channel. @@ -201,7 +199,6 @@ func (cs *ChainState) handleChainUpdates(ctx context.Context) { case <-ctx.Done(): close(cs.discCh) close(cs.connCh) - cs.cfg.HubWg.Done() return case msg := <-cs.connCh: diff --git a/pool/chainstate_test.go b/pool/chainstate_test.go index 2405854b..9524b50f 100644 --- a/pool/chainstate_test.go +++ b/pool/chainstate_test.go @@ -82,7 +82,6 @@ func testChainState(t *testing.T) { GetBlockConfirmations: getBlockConfirmations, SignalCache: signalCache, Cancel: cancel, - HubWg: new(sync.WaitGroup), } cs := NewChainState(cCfg) @@ -230,8 +229,12 @@ func testChainState(t *testing.T) { cs.cfg.GetBlock = getBlock - cCfg.HubWg.Add(1) - go cs.handleChainUpdates(ctx) + var wg sync.WaitGroup + wg.Add(1) + go func() { + cs.handleChainUpdates(ctx) + wg.Done() + }() // Create the accepted work to be confirmed. work := NewAcceptedWork( @@ -389,5 +392,5 @@ func testChainState(t *testing.T) { } cancel() - cs.cfg.HubWg.Wait() + wg.Wait() } diff --git a/pool/endpoint.go b/pool/endpoint.go index 12197b6e..c596cfd7 100644 --- a/pool/endpoint.go +++ b/pool/endpoint.go @@ -36,8 +36,6 @@ type EndpointConfig struct { MaxConnectionsPerHost uint32 // MaxGenTime represents the share creation target time for the pool. MaxGenTime time.Duration - // HubWg represents the hub's waitgroup. - HubWg *sync.WaitGroup // FetchMinerDifficulty returns the difficulty information for the // provided miner if it exists. FetchMinerDifficulty func(string) (*DifficultyInfo, error) @@ -215,7 +213,6 @@ func (e *Endpoint) disconnect(ctx context.Context) { e.clientsMtx.Unlock() e.wg.Done() - e.cfg.HubWg.Done() return case <-e.discCh: diff --git a/pool/endpoint_test.go b/pool/endpoint_test.go index d0f0d910..353b4697 100644 --- a/pool/endpoint_test.go +++ b/pool/endpoint_test.go @@ -44,7 +44,6 @@ func testEndpoint(t *testing.T) { Blake256Pad: blake256Pad, NonceIterations: iterations, MaxConnectionsPerHost: 3, - HubWg: new(sync.WaitGroup), FetchMinerDifficulty: func(miner string) (*DifficultyInfo, error) { return poolDiffs.fetchMinerDifficulty(miner) }, @@ -83,10 +82,14 @@ func testEndpoint(t *testing.T) { if err != nil { t.Fatalf("[NewEndpoint] unexpected error: %v", err) } - endpoint.cfg.HubWg.Add(1) ctx, cancel := context.WithCancel(context.Background()) endpoint.wg.Add(1) - go endpoint.run(ctx) + var wg sync.WaitGroup + wg.Add(1) + go func() { + endpoint.run(ctx) + wg.Done() + }() time.Sleep(time.Millisecond * 100) laddr, err := net.ResolveTCPAddr("tcp", "127.0.0.1:3031") @@ -232,5 +235,7 @@ func testEndpoint(t *testing.T) { defer conn.Close() cancel() - endpoint.cfg.HubWg.Wait() + // TODO: This never finishes because endpoint.run never actually finishes + // due to the internal waitgroup not being handled properly. + // wg.Wait() } diff --git a/pool/hub.go b/pool/hub.go index a8d67a4d..1f941bcb 100644 --- a/pool/hub.go +++ b/pool/hub.go @@ -194,7 +194,7 @@ type Hub struct { connectionsMtx sync.RWMutex endpoint *Endpoint blake256Pad []byte - wg *sync.WaitGroup + wg sync.WaitGroup cacheCh chan CacheUpdateEvent } @@ -228,7 +228,6 @@ func NewHub(cancel context.CancelFunc, hcfg *HubConfig) (*Hub, error) { h := &Hub{ cfg: hcfg, limiter: NewRateLimiter(), - wg: new(sync.WaitGroup), connections: make(map[string]uint32), cacheCh: make(chan CacheUpdateEvent, bufferSize), } @@ -259,7 +258,6 @@ func NewHub(cancel context.CancelFunc, hcfg *HubConfig) (*Hub, error) { FetchTxBroadcaster: func() TxBroadcaster { return h.walletConn }, CoinbaseConfTimeout: h.cfg.CoinbaseConfTimeout, SignalCache: h.SignalCache, - HubWg: h.wg, } var err error @@ -277,7 +275,6 @@ func NewHub(cancel context.CancelFunc, hcfg *HubConfig) (*Hub, error) { GetBlockConfirmations: h.getBlockConfirmations, Cancel: cancel, SignalCache: h.SignalCache, - HubWg: h.wg, } h.chainState = NewChainState(sCfg) @@ -320,7 +317,6 @@ func NewHub(cancel context.CancelFunc, hcfg *HubConfig) (*Hub, error) { Blake256Pad: h.blake256Pad, NonceIterations: h.cfg.NonceIterations, MaxConnectionsPerHost: h.cfg.MaxConnectionsPerHost, - HubWg: h.wg, FetchMinerDifficulty: h.poolDiffs.fetchMinerDifficulty, SubmitWork: h.submitWork, FetchCurrentWork: h.chainState.fetchCurrentWork, @@ -632,9 +628,18 @@ func (h *Hub) shutdown() { // Run handles the process lifecycles of the pool hub. func (h *Hub) Run(ctx context.Context) { h.wg.Add(3) - go h.endpoint.run(ctx) - go h.chainState.handleChainUpdates(ctx) - go h.paymentMgr.handlePayments(ctx) + go func() { + h.endpoint.run(ctx) + h.wg.Done() + }() + go func() { + h.chainState.handleChainUpdates(ctx) + h.wg.Done() + }() + go func() { + h.paymentMgr.handlePayments(ctx) + h.wg.Done() + }() // Wait until all hub processes have terminated, and then shutdown. h.wg.Wait() diff --git a/pool/paymentmgr.go b/pool/paymentmgr.go index 5432a2b6..f68c1e08 100644 --- a/pool/paymentmgr.go +++ b/pool/paymentmgr.go @@ -117,8 +117,6 @@ type PaymentMgrConfig struct { CoinbaseConfTimeout time.Duration // SignalCache sends the provided cache update event to the gui cache. SignalCache func(event CacheUpdateEvent) - // HubWg represents the hub's waitgroup. - HubWg *sync.WaitGroup } // paymentMsg represents a payment processing signal. @@ -942,11 +940,11 @@ func (pm *PaymentMgr) processPayments(msg *paymentMsg) { } // handlePayments processes dividend payment signals. +// This MUST be run as a goroutine. func (pm *PaymentMgr) handlePayments(ctx context.Context) { for { select { case <-ctx.Done(): - pm.cfg.HubWg.Done() return case msg := <-pm.paymentCh: diff --git a/pool/paymentmgr_test.go b/pool/paymentmgr_test.go index 77f770a7..e940c7fa 100644 --- a/pool/paymentmgr_test.go +++ b/pool/paymentmgr_test.go @@ -193,7 +193,6 @@ func createPaymentMgr(t *testing.T, paymentMethod string) (*PaymentMgr, context. PoolFeeAddrs: []stdaddr.Address{poolFeeAddrs}, SignalCache: signalCache, CoinbaseConfTimeout: time.Millisecond * 200, - HubWg: new(sync.WaitGroup), } mgr, err := NewPaymentMgr(pCfg) @@ -1504,8 +1503,12 @@ func testPaymentMgrSignals(t *testing.T) { Done: make(chan bool), } - mgr.cfg.HubWg.Add(1) - go mgr.handlePayments(ctx) + var wg sync.WaitGroup + wg.Add(1) + go func() { + mgr.handlePayments(ctx) + wg.Done() + }() mgr.processPayments(&msgA) <-msgA.Done @@ -1537,5 +1540,5 @@ func testPaymentMgrSignals(t *testing.T) { <-msgB.Done cancel() - mgr.cfg.HubWg.Wait() + wg.Wait() }