Skip to content

Commit

Permalink
main: Slightly rework main hub and gui lifecycle.
Browse files Browse the repository at this point in the history
This moves the creation of the context into the main code path, splits
the creation logic for the hub and gui into independent funcs, and
arranges for the main code path to create those instances and run them.

Since the pool struct is no longer necessary, both it and the associated
newPool func are removed.

This approach is preferred because it provides the main code path with
full control over the lifecycle of the context instead of stuffing it
into a separate struct which makes it much harder to reason about and
has ultimately led to the use of the cancel func deep in the innards of
the hub which should not have that level of control since it is not the
coordinator and thus does not have enough details to know when it's
actually safe or even should be forcing a shutdown.

This commit does not address the latter part since it will require more
in-depth changes to decouple it properly.  However, this is a good start
at reducing the overly-tight coupling of the lifecycle that exists as a
result.
  • Loading branch information
davecgh authored and jholdstock committed Sep 13, 2023
1 parent 2c44cbe commit 26ee72b
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 66 deletions.
135 changes: 73 additions & 62 deletions dcrpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,32 +27,24 @@ import (
// Conditional compilation is used to also include SIGTERM and SIGHUP on Unix.
var signals = []os.Signal{os.Interrupt}

// miningPool represents a decred proof-of-Work mining pool.
type miningPool struct {
ctx context.Context
cancel context.CancelFunc
hub *pool.Hub
gui *gui.GUI
}

// newPool initializes the mining pool.
func newPool(db pool.Database, cfg *config) (*miningPool, error) {
p := new(miningPool)
dcrdRPCCfg := &rpcclient.ConnConfig{
// newHub returns a new pool hub configured with the provided details that is
// ready to connect to a consensus daemon and wallet in the case of publicly
// available pools.
func newHub(cfg *config, db pool.Database, cancel context.CancelFunc) (*pool.Hub, error) {
dcrdRPCCfg := rpcclient.ConnConfig{
Host: cfg.DcrdRPCHost,
Endpoint: "ws",
User: cfg.RPCUser,
Pass: cfg.RPCPass,
Certificates: cfg.dcrdRPCCerts,
}
p.ctx, p.cancel = context.WithCancel(context.Background())
powLimit := cfg.net.PowLimit
powLimitF, _ := new(big.Float).SetInt(powLimit).Float64()
iterations := math.Pow(2, 256-math.Floor(math.Log2(powLimitF)))

hcfg := &pool.HubConfig{
DB: db,
NodeRPCConfig: dcrdRPCCfg,
NodeRPCConfig: &dcrdRPCCfg,
WalletRPCCert: cfg.WalletRPCCert,
WalletTLSCert: cfg.WalletTLSCert,
WalletTLSKey: cfg.WalletTLSKey,
Expand All @@ -75,23 +67,13 @@ func newPool(db pool.Database, cfg *config) (*miningPool, error) {
ClientTimeout: cfg.clientTimeout,
}

var err error
p.hub, err = pool.NewHub(p.cancel, hcfg)
if err != nil {
return nil, fmt.Errorf("unable to initialize hub: %w", err)
}

err = p.hub.Connect(p.ctx)
if err != nil {
return nil, fmt.Errorf("unable to establish node connections: %w", err)
}

err = p.hub.FetchWork(p.ctx)
if err != nil {
return nil, err
}
return pool.NewHub(cancel, hcfg)
}

csrfSecret, err := p.hub.CSRFSecret()
// newGUI returns a new GUI configured with the provided details that is ready
// to run.
func newGUI(cfg *config, hub *pool.Hub) (*gui.GUI, error) {
csrfSecret, err := hub.CSRFSecret()
if err != nil {
return nil, err
}
Expand All @@ -113,27 +95,23 @@ func newPool(db pool.Database, cfg *config) (*miningPool, error) {
PoolFee: cfg.PoolFee,
CSRFSecret: csrfSecret,
MinerListen: cfg.MinerListen,
WithinLimit: p.hub.WithinLimit,
FetchLastWorkHeight: p.hub.FetchLastWorkHeight,
FetchLastPaymentInfo: p.hub.FetchLastPaymentInfo,
FetchMinedWork: p.hub.FetchMinedWork,
FetchWorkQuotas: p.hub.FetchWorkQuotas,
FetchHashData: p.hub.FetchHashData,
AccountExists: p.hub.AccountExists,
FetchArchivedPayments: p.hub.FetchArchivedPayments,
FetchPendingPayments: p.hub.FetchPendingPayments,
FetchCacheChannel: p.hub.FetchCacheChannel,
WithinLimit: hub.WithinLimit,
FetchLastWorkHeight: hub.FetchLastWorkHeight,
FetchLastPaymentInfo: hub.FetchLastPaymentInfo,
FetchMinedWork: hub.FetchMinedWork,
FetchWorkQuotas: hub.FetchWorkQuotas,
FetchHashData: hub.FetchHashData,
AccountExists: hub.AccountExists,
FetchArchivedPayments: hub.FetchArchivedPayments,
FetchPendingPayments: hub.FetchPendingPayments,
FetchCacheChannel: hub.FetchCacheChannel,
}

if !cfg.UsePostgres {
gcfg.HTTPBackupDB = p.hub.HTTPBackupDB
gcfg.HTTPBackupDB = hub.HTTPBackupDB
}

p.gui, err = gui.NewGUI(gcfg)
if err != nil {
return nil, err
}
return p, nil
return gui.NewGUI(gcfg)
}

// realMain is the real main function for dcrpool. It is necessary to work
Expand Down Expand Up @@ -162,8 +140,16 @@ func realMain() error {
logRotator.Close()
}
}()

// Primary context that controls the entire process.
ctx, cancel := context.WithCancel(context.Background())
defer mpLog.Info("Shutdown complete")

// Show version and home dir at startup.
mpLog.Infof("%s version %s (Go version %s %s/%s)", appName,
Version, runtime.Version(), runtime.GOOS, runtime.GOARCH)
mpLog.Infof("Home dir: %s", cfg.HomeDir)

var db pool.Database
if cfg.UsePostgres {
db, err = pool.InitPostgresDB(cfg.PGHost, cfg.PGPort, cfg.PGUser,
Expand All @@ -172,17 +158,12 @@ func realMain() error {
db, err = pool.InitBoltDB(cfg.DBFile)
}
if err != nil {
cancel()
mpLog.Errorf("failed to initialize database: %v", err)
return err
}
defer db.Close()

p, err := newPool(db, cfg)
if err != nil {
mpLog.Errorf("failed to initialize pool: %v", err)
return err
}

if cfg.Profile != "" {
// Start the profiler.
go func() {
Expand All @@ -195,27 +176,57 @@ func realMain() error {
err := http.ListenAndServe(listenAddr, nil)
if err != nil {
mpLog.Criticalf(err.Error())
p.cancel()
cancel()
}
}()
}

mpLog.Infof("%s version %s (Go version %s %s/%s)", appName,
Version, runtime.Version(), runtime.GOOS, runtime.GOARCH)
mpLog.Infof("Home dir: %s", cfg.HomeDir)
mpLog.Infof("Started dcrpool")

go func() {
select {
case <-p.ctx.Done():
case <-ctx.Done():
return

case <-interrupt:
p.cancel()
cancel()
}
}()
p.gui.Run(p.ctx)
p.hub.Run(p.ctx)

// Create a hub and GUI instance.
hub, err := newHub(cfg, db, cancel)
if err != nil {
mpLog.Errorf("unable to initialize hub: %v", err)
return err
}
gui, err := newGUI(cfg, hub)
if err != nil {
mpLog.Errorf("unable to initialize GUI: %v", err)
return err
}

// Run the GUI in the background.
go gui.Run(ctx)

// Run the hub. This will block until the context is cancelled.
runHub := func(ctx context.Context, h *pool.Hub) error {
// Ideally these would go into hub.Run, but the tests don't work
// properly with this code there due to their tight coupling.
if err := h.Connect(ctx); err != nil {
return fmt.Errorf("unable to establish node connections: %w", err)
}

if err := h.FetchWork(ctx); err != nil {
return fmt.Errorf("unable to get work from consensus daemon: %w", err)
}

h.Run(ctx)
return nil
}
if err := runHub(ctx, hub); err != nil {
// Ensure the GUI is signaled to shutdown.
cancel()
mpLog.Errorf("unable to run pool hub: %v", err)
return err
}

// hub.Run() blocks until the pool is fully shut down. When it returns,
// write a backup of the DB (if not using postgres), and then close the DB.
Expand Down
5 changes: 1 addition & 4 deletions pool/hub.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,6 @@ type Hub struct {
chainState *ChainState
connections map[string]uint32
connectionsMtx sync.RWMutex
cancel context.CancelFunc
endpoint *Endpoint
blake256Pad []byte
wg *sync.WaitGroup
Expand Down Expand Up @@ -232,7 +231,6 @@ func NewHub(cancel context.CancelFunc, hcfg *HubConfig) (*Hub, error) {
wg: new(sync.WaitGroup),
connections: make(map[string]uint32),
cacheCh: make(chan CacheUpdateEvent, bufferSize),
cancel: cancel,
}
h.blake256Pad = generateBlake256Pad()
powLimit := new(big.Rat).SetInt(h.cfg.ActiveNet.PowLimit)
Expand Down Expand Up @@ -277,7 +275,7 @@ func NewHub(cancel context.CancelFunc, hcfg *HubConfig) (*Hub, error) {
GeneratePayments: h.paymentMgr.generatePayments,
GetBlock: h.getBlock,
GetBlockConfirmations: h.getBlockConfirmations,
Cancel: h.cancel,
Cancel: cancel,
SignalCache: h.SignalCache,
HubWg: h.wg,
}
Expand Down Expand Up @@ -741,7 +739,6 @@ func (h *Hub) AccountExists(accountID string) bool {
// CSRFSecret fetches a persisted secret or generates a new one.
func (h *Hub) CSRFSecret() ([]byte, error) {
secret, err := h.cfg.DB.fetchCSRFSecret()

if err != nil {
if errors.Is(err, errs.ValueNotFound) {
// If the database doesnt contain a CSRF secret, generate one and
Expand Down

0 comments on commit 26ee72b

Please sign in to comment.