Skip to content

Commit

Permalink
pool: Keep hub waitgroup local.
Browse files Browse the repository at this point in the history
The waitgroup for the hub is currently being passed down into all
various configuration structs for each of the goroutines it runs which
requires a lot of plumbing and is quite error prone.

This addresses that by modifying the logic to keep the hub waitgroup
local to the run method and removes the aforementioned unnecessary
plumbing.
  • Loading branch information
davecgh committed Sep 13, 2023
1 parent 2392120 commit fb088e0
Show file tree
Hide file tree
Showing 4 changed files with 14 additions and 17 deletions.
3 changes: 0 additions & 3 deletions pool/chainstate.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,6 @@ type ChainStateConfig struct {
Cancel context.CancelFunc
// SignalCache sends the provided cache update event to the gui cache.
SignalCache func(event CacheUpdateEvent)
// HubWg represents the hub's waitgroup.
HubWg *sync.WaitGroup
}

// blockNotification wraps a block header notification and a done channel.
Expand Down Expand Up @@ -201,7 +199,6 @@ func (cs *ChainState) handleChainUpdates(ctx context.Context) {
case <-ctx.Done():
close(cs.discCh)
close(cs.connCh)
cs.cfg.HubWg.Done()
return

case msg := <-cs.connCh:
Expand Down
3 changes: 0 additions & 3 deletions pool/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,6 @@ type EndpointConfig struct {
MaxConnectionsPerHost uint32
// MaxGenTime represents the share creation target time for the pool.
MaxGenTime time.Duration
// HubWg represents the hub's waitgroup.
HubWg *sync.WaitGroup
// FetchMinerDifficulty returns the difficulty information for the
// provided miner if it exists.
FetchMinerDifficulty func(string) (*DifficultyInfo, error)
Expand Down Expand Up @@ -215,7 +213,6 @@ func (e *Endpoint) disconnect(ctx context.Context) {
e.clientsMtx.Unlock()

e.wg.Done()
e.cfg.HubWg.Done()
return

case <-e.discCh:
Expand Down
21 changes: 13 additions & 8 deletions pool/hub.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ type Hub struct {
connectionsMtx sync.RWMutex
endpoint *Endpoint
blake256Pad []byte
wg *sync.WaitGroup
wg sync.WaitGroup
cacheCh chan CacheUpdateEvent
}

Expand Down Expand Up @@ -228,7 +228,6 @@ func NewHub(cancel context.CancelFunc, hcfg *HubConfig) (*Hub, error) {
h := &Hub{
cfg: hcfg,
limiter: NewRateLimiter(),
wg: new(sync.WaitGroup),
connections: make(map[string]uint32),
cacheCh: make(chan CacheUpdateEvent, bufferSize),
}
Expand Down Expand Up @@ -259,7 +258,6 @@ func NewHub(cancel context.CancelFunc, hcfg *HubConfig) (*Hub, error) {
FetchTxBroadcaster: func() TxBroadcaster { return h.walletConn },
CoinbaseConfTimeout: h.cfg.CoinbaseConfTimeout,
SignalCache: h.SignalCache,
HubWg: h.wg,
}

var err error
Expand All @@ -277,7 +275,6 @@ func NewHub(cancel context.CancelFunc, hcfg *HubConfig) (*Hub, error) {
GetBlockConfirmations: h.getBlockConfirmations,
Cancel: cancel,
SignalCache: h.SignalCache,
HubWg: h.wg,
}
h.chainState = NewChainState(sCfg)

Expand Down Expand Up @@ -320,7 +317,6 @@ func NewHub(cancel context.CancelFunc, hcfg *HubConfig) (*Hub, error) {
Blake256Pad: h.blake256Pad,
NonceIterations: h.cfg.NonceIterations,
MaxConnectionsPerHost: h.cfg.MaxConnectionsPerHost,
HubWg: h.wg,
FetchMinerDifficulty: h.poolDiffs.fetchMinerDifficulty,
SubmitWork: h.submitWork,
FetchCurrentWork: h.chainState.fetchCurrentWork,
Expand Down Expand Up @@ -632,9 +628,18 @@ func (h *Hub) shutdown() {
// Run handles the process lifecycles of the pool hub.
func (h *Hub) Run(ctx context.Context) {
h.wg.Add(3)
go h.endpoint.run(ctx)
go h.chainState.handleChainUpdates(ctx)
go h.paymentMgr.handlePayments(ctx)
go func(ctx context.Context) {
h.endpoint.run(ctx)
h.wg.Done()
}(ctx)
go func(ctx context.Context) {
h.chainState.handleChainUpdates(ctx)
h.wg.Done()
}(ctx)
go func(ctx context.Context) {
h.paymentMgr.handlePayments(ctx)
h.wg.Done()
}(ctx)

// Wait until all hub processes have terminated, and then shutdown.
h.wg.Wait()
Expand Down
4 changes: 1 addition & 3 deletions pool/paymentmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,6 @@ type PaymentMgrConfig struct {
CoinbaseConfTimeout time.Duration
// SignalCache sends the provided cache update event to the gui cache.
SignalCache func(event CacheUpdateEvent)
// HubWg represents the hub's waitgroup.
HubWg *sync.WaitGroup
}

// paymentMsg represents a payment processing signal.
Expand Down Expand Up @@ -942,11 +940,11 @@ func (pm *PaymentMgr) processPayments(msg *paymentMsg) {
}

// handlePayments processes dividend payment signals.
// This MUST be run as a goroutine.
func (pm *PaymentMgr) handlePayments(ctx context.Context) {
for {
select {
case <-ctx.Done():
pm.cfg.HubWg.Done()
return

case msg := <-pm.paymentCh:
Expand Down

0 comments on commit fb088e0

Please sign in to comment.