diff --git a/pool/chainstate.go b/pool/chainstate.go index 03ce5da4..6ab9630d 100644 --- a/pool/chainstate.go +++ b/pool/chainstate.go @@ -45,8 +45,6 @@ type ChainStateConfig struct { Cancel context.CancelFunc // SignalCache sends the provided cache update event to the gui cache. SignalCache func(event CacheUpdateEvent) - // HubWg represents the hub's waitgroup. - HubWg *sync.WaitGroup } // blockNotification wraps a block header notification and a done channel. @@ -201,7 +199,6 @@ func (cs *ChainState) handleChainUpdates(ctx context.Context) { case <-ctx.Done(): close(cs.discCh) close(cs.connCh) - cs.cfg.HubWg.Done() return case msg := <-cs.connCh: diff --git a/pool/chainstate_test.go b/pool/chainstate_test.go index 2405854b..9524b50f 100644 --- a/pool/chainstate_test.go +++ b/pool/chainstate_test.go @@ -82,7 +82,6 @@ func testChainState(t *testing.T) { GetBlockConfirmations: getBlockConfirmations, SignalCache: signalCache, Cancel: cancel, - HubWg: new(sync.WaitGroup), } cs := NewChainState(cCfg) @@ -230,8 +229,12 @@ func testChainState(t *testing.T) { cs.cfg.GetBlock = getBlock - cCfg.HubWg.Add(1) - go cs.handleChainUpdates(ctx) + var wg sync.WaitGroup + wg.Add(1) + go func() { + cs.handleChainUpdates(ctx) + wg.Done() + }() // Create the accepted work to be confirmed. work := NewAcceptedWork( @@ -389,5 +392,5 @@ func testChainState(t *testing.T) { } cancel() - cs.cfg.HubWg.Wait() + wg.Wait() } diff --git a/pool/endpoint.go b/pool/endpoint.go index 12197b6e..c596cfd7 100644 --- a/pool/endpoint.go +++ b/pool/endpoint.go @@ -36,8 +36,6 @@ type EndpointConfig struct { MaxConnectionsPerHost uint32 // MaxGenTime represents the share creation target time for the pool. MaxGenTime time.Duration - // HubWg represents the hub's waitgroup. - HubWg *sync.WaitGroup // FetchMinerDifficulty returns the difficulty information for the // provided miner if it exists. FetchMinerDifficulty func(string) (*DifficultyInfo, error) @@ -215,7 +213,6 @@ func (e *Endpoint) disconnect(ctx context.Context) { e.clientsMtx.Unlock() e.wg.Done() - e.cfg.HubWg.Done() return case <-e.discCh: diff --git a/pool/endpoint_test.go b/pool/endpoint_test.go index d0f0d910..353b4697 100644 --- a/pool/endpoint_test.go +++ b/pool/endpoint_test.go @@ -44,7 +44,6 @@ func testEndpoint(t *testing.T) { Blake256Pad: blake256Pad, NonceIterations: iterations, MaxConnectionsPerHost: 3, - HubWg: new(sync.WaitGroup), FetchMinerDifficulty: func(miner string) (*DifficultyInfo, error) { return poolDiffs.fetchMinerDifficulty(miner) }, @@ -83,10 +82,14 @@ func testEndpoint(t *testing.T) { if err != nil { t.Fatalf("[NewEndpoint] unexpected error: %v", err) } - endpoint.cfg.HubWg.Add(1) ctx, cancel := context.WithCancel(context.Background()) endpoint.wg.Add(1) - go endpoint.run(ctx) + var wg sync.WaitGroup + wg.Add(1) + go func() { + endpoint.run(ctx) + wg.Done() + }() time.Sleep(time.Millisecond * 100) laddr, err := net.ResolveTCPAddr("tcp", "127.0.0.1:3031") @@ -232,5 +235,7 @@ func testEndpoint(t *testing.T) { defer conn.Close() cancel() - endpoint.cfg.HubWg.Wait() + // TODO: This never finishes because endpoint.run never actually finishes + // due to the internal waitgroup not being handled properly. + // wg.Wait() } diff --git a/pool/hub.go b/pool/hub.go index a8d67a4d..1f941bcb 100644 --- a/pool/hub.go +++ b/pool/hub.go @@ -194,7 +194,7 @@ type Hub struct { connectionsMtx sync.RWMutex endpoint *Endpoint blake256Pad []byte - wg *sync.WaitGroup + wg sync.WaitGroup cacheCh chan CacheUpdateEvent } @@ -228,7 +228,6 @@ func NewHub(cancel context.CancelFunc, hcfg *HubConfig) (*Hub, error) { h := &Hub{ cfg: hcfg, limiter: NewRateLimiter(), - wg: new(sync.WaitGroup), connections: make(map[string]uint32), cacheCh: make(chan CacheUpdateEvent, bufferSize), } @@ -259,7 +258,6 @@ func NewHub(cancel context.CancelFunc, hcfg *HubConfig) (*Hub, error) { FetchTxBroadcaster: func() TxBroadcaster { return h.walletConn }, CoinbaseConfTimeout: h.cfg.CoinbaseConfTimeout, SignalCache: h.SignalCache, - HubWg: h.wg, } var err error @@ -277,7 +275,6 @@ func NewHub(cancel context.CancelFunc, hcfg *HubConfig) (*Hub, error) { GetBlockConfirmations: h.getBlockConfirmations, Cancel: cancel, SignalCache: h.SignalCache, - HubWg: h.wg, } h.chainState = NewChainState(sCfg) @@ -320,7 +317,6 @@ func NewHub(cancel context.CancelFunc, hcfg *HubConfig) (*Hub, error) { Blake256Pad: h.blake256Pad, NonceIterations: h.cfg.NonceIterations, MaxConnectionsPerHost: h.cfg.MaxConnectionsPerHost, - HubWg: h.wg, FetchMinerDifficulty: h.poolDiffs.fetchMinerDifficulty, SubmitWork: h.submitWork, FetchCurrentWork: h.chainState.fetchCurrentWork, @@ -632,9 +628,18 @@ func (h *Hub) shutdown() { // Run handles the process lifecycles of the pool hub. func (h *Hub) Run(ctx context.Context) { h.wg.Add(3) - go h.endpoint.run(ctx) - go h.chainState.handleChainUpdates(ctx) - go h.paymentMgr.handlePayments(ctx) + go func() { + h.endpoint.run(ctx) + h.wg.Done() + }() + go func() { + h.chainState.handleChainUpdates(ctx) + h.wg.Done() + }() + go func() { + h.paymentMgr.handlePayments(ctx) + h.wg.Done() + }() // Wait until all hub processes have terminated, and then shutdown. h.wg.Wait() diff --git a/pool/paymentmgr.go b/pool/paymentmgr.go index 5432a2b6..f68c1e08 100644 --- a/pool/paymentmgr.go +++ b/pool/paymentmgr.go @@ -117,8 +117,6 @@ type PaymentMgrConfig struct { CoinbaseConfTimeout time.Duration // SignalCache sends the provided cache update event to the gui cache. SignalCache func(event CacheUpdateEvent) - // HubWg represents the hub's waitgroup. - HubWg *sync.WaitGroup } // paymentMsg represents a payment processing signal. @@ -942,11 +940,11 @@ func (pm *PaymentMgr) processPayments(msg *paymentMsg) { } // handlePayments processes dividend payment signals. +// This MUST be run as a goroutine. func (pm *PaymentMgr) handlePayments(ctx context.Context) { for { select { case <-ctx.Done(): - pm.cfg.HubWg.Done() return case msg := <-pm.paymentCh: diff --git a/pool/paymentmgr_test.go b/pool/paymentmgr_test.go index 77f770a7..e940c7fa 100644 --- a/pool/paymentmgr_test.go +++ b/pool/paymentmgr_test.go @@ -193,7 +193,6 @@ func createPaymentMgr(t *testing.T, paymentMethod string) (*PaymentMgr, context. PoolFeeAddrs: []stdaddr.Address{poolFeeAddrs}, SignalCache: signalCache, CoinbaseConfTimeout: time.Millisecond * 200, - HubWg: new(sync.WaitGroup), } mgr, err := NewPaymentMgr(pCfg) @@ -1504,8 +1503,12 @@ func testPaymentMgrSignals(t *testing.T) { Done: make(chan bool), } - mgr.cfg.HubWg.Add(1) - go mgr.handlePayments(ctx) + var wg sync.WaitGroup + wg.Add(1) + go func() { + mgr.handlePayments(ctx) + wg.Done() + }() mgr.processPayments(&msgA) <-msgA.Done @@ -1537,5 +1540,5 @@ func testPaymentMgrSignals(t *testing.T) { <-msgB.Done cancel() - mgr.cfg.HubWg.Wait() + wg.Wait() }