Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

multi: Make GUI run blocking and wait for shutdown. #370

Merged
merged 1 commit into from
Sep 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 26 additions & 27 deletions dcrpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"path/filepath"
"runtime"
"strings"
"sync"

"github.com/decred/dcrd/rpcclient/v8"
"github.com/decred/dcrpool/internal/gui"
Expand Down Expand Up @@ -188,45 +189,43 @@ func realMain() error {
}
}()

// Create a hub and GUI instance.
// Create a hub instance and attempt to perform initial connection and work
// acquisition.
hub, err := newHub(cfg, db, cancel)
if err != nil {
mpLog.Errorf("unable to initialize hub: %v", err)
return err
}
if err := hub.Connect(ctx); err != nil {
mpLog.Errorf("unable to establish node connections: %v", err)
return err
}
if err := hub.FetchWork(ctx); err != nil {
mpLog.Errorf("unable to get work from consensus daemon: %v", err)
return err
}

// Create a gui instance.
gui, err := newGUI(cfg, hub)
if err != nil {
mpLog.Errorf("unable to initialize GUI: %v", err)
return err
}

// Run the GUI in the background.
go gui.Run(ctx)

// Run the hub. This will block until the context is cancelled.
runHub := func(ctx context.Context, h *pool.Hub) error {
// Ideally these would go into hub.Run, but the tests don't work
// properly with this code there due to their tight coupling.
if err := h.Connect(ctx); err != nil {
return fmt.Errorf("unable to establish node connections: %w", err)
}

if err := h.FetchWork(ctx); err != nil {
return fmt.Errorf("unable to get work from consensus daemon: %w", err)
}

h.Run(ctx)
return nil
}
if err := runHub(ctx, hub); err != nil {
// Ensure the GUI is signaled to shutdown.
cancel()
mpLog.Errorf("unable to run pool hub: %v", err)
return err
}
// Run the GUI and hub in the background.
var wg sync.WaitGroup
wg.Add(2)
go func() {
hub.Run(ctx)
wg.Done()
}()
go func() {
gui.Run(ctx)
wg.Done()
}()
wg.Wait()

// hub.Run() blocks until the pool is fully shut down. When it returns,
// write a backup of the DB (if not using postgres), and then close the DB.
// Write a backup of the DB (if not using postgres) once the hub shuts down.
if !cfg.UsePostgres {
mpLog.Info("Backing up database.")
err = db.Backup(pool.BoltBackupFile)
Expand Down
128 changes: 63 additions & 65 deletions internal/gui/gui.go
Original file line number Diff line number Diff line change
Expand Up @@ -345,81 +345,79 @@ func (ui *GUI) Run(ctx context.Context) {

// Use a ticker to periodically update cached data and push updates through
// any established websockets
go func(ctx context.Context) {
signalCh := ui.cfg.FetchCacheChannel()
ticker := time.NewTicker(15 * time.Second)
defer ticker.Stop()

for {
select {
case <-ticker.C:
hashData, err := ui.cfg.FetchHashData()
signalCh := ui.cfg.FetchCacheChannel()
ticker := time.NewTicker(15 * time.Second)
defer ticker.Stop()

for {
select {
case <-ticker.C:
hashData, err := ui.cfg.FetchHashData()
if err != nil {
log.Error(err)
continue
}

ui.cache.updateHashData(hashData)
ui.websocketServer.send(payload{
PoolHashRate: ui.cache.getPoolHash(),
})

case msg := <-signalCh:
switch msg {
case pool.Confirmed, pool.Unconfirmed:
work, err := ui.cfg.FetchMinedWork()
if err != nil {
log.Error(err)
continue
}

ui.cache.updateHashData(hashData)
ui.cache.updateMinedWork(work)
ui.websocketServer.send(payload{
PoolHashRate: ui.cache.getPoolHash(),
LastWorkHeight: ui.cfg.FetchLastWorkHeight(),
})

case msg := <-signalCh:
switch msg {
case pool.Confirmed, pool.Unconfirmed:
work, err := ui.cfg.FetchMinedWork()
if err != nil {
log.Error(err)
continue
}

ui.cache.updateMinedWork(work)
ui.websocketServer.send(payload{
LastWorkHeight: ui.cfg.FetchLastWorkHeight(),
})

case pool.ClaimedShare:
quotas, err := ui.cfg.FetchWorkQuotas()
if err != nil {
log.Error(err)
continue
}

ui.cache.updateRewardQuotas(quotas)

case pool.DividendsPaid:
pendingPayments, err := ui.cfg.FetchPendingPayments()
if err != nil {
log.Error(err)
continue
}

archivedPayments, err := ui.cfg.FetchArchivedPayments()
if err != nil {
log.Error(err)
continue
}

ui.cache.updatePayments(pendingPayments, archivedPayments)

lastPmtHeight, lastPmtPaidOn, lastPmtCreatedOn, err := ui.cfg.FetchLastPaymentInfo()
if err != nil {
log.Error(err)
continue
}
ui.cache.updateLastPaymentInfo(lastPmtHeight, lastPmtPaidOn, lastPmtCreatedOn)

ui.websocketServer.send(payload{
LastPaymentHeight: lastPmtHeight,
})

default:
log.Errorf("unknown cache signal received: %v", msg)
case pool.ClaimedShare:
quotas, err := ui.cfg.FetchWorkQuotas()
if err != nil {
log.Error(err)
continue
}

case <-ctx.Done():
return
ui.cache.updateRewardQuotas(quotas)

case pool.DividendsPaid:
pendingPayments, err := ui.cfg.FetchPendingPayments()
if err != nil {
log.Error(err)
continue
}

archivedPayments, err := ui.cfg.FetchArchivedPayments()
if err != nil {
log.Error(err)
continue
}

ui.cache.updatePayments(pendingPayments, archivedPayments)

lastPmtHeight, lastPmtPaidOn, lastPmtCreatedOn, err := ui.cfg.FetchLastPaymentInfo()
if err != nil {
log.Error(err)
continue
}
ui.cache.updateLastPaymentInfo(lastPmtHeight, lastPmtPaidOn, lastPmtCreatedOn)

ui.websocketServer.send(payload{
LastPaymentHeight: lastPmtHeight,
})

default:
log.Errorf("unknown cache signal received: %v", msg)
}

case <-ctx.Done():
return
}
}(ctx)
}
}
Loading