From 68e28ac339c941e1fbef2148e448e39f5e513c5e Mon Sep 17 00:00:00 2001 From: "luke.lin" Date: Fri, 22 Jan 2021 16:23:52 -0800 Subject: [PATCH 1/4] Add stacktrace to reconciliation errors --- go.mod | 2 +- reconciler/reconciler.go | 189 ++++++++++++++++++++++++++------------- 2 files changed, 126 insertions(+), 65 deletions(-) diff --git a/go.mod b/go.mod index f7e765eb..d524f720 100644 --- a/go.mod +++ b/go.mod @@ -18,7 +18,7 @@ require ( github.com/lucasjones/reggen v0.0.0-20180717132126-cdb49ff09d77 github.com/mitchellh/mapstructure v1.3.3 github.com/neilotoole/errgroup v0.1.5 - github.com/pkg/errors v0.9.1 // indirect + github.com/pkg/errors v0.9.1 github.com/segmentio/fasthash v1.0.3 github.com/stretchr/objx v0.1.1 // indirect github.com/stretchr/testify v1.6.1 diff --git a/reconciler/reconciler.go b/reconciler/reconciler.go index c280f6ce..236617ba 100644 --- a/reconciler/reconciler.go +++ b/reconciler/reconciler.go @@ -27,6 +27,7 @@ import ( storageErrors "github.com/coinbase/rosetta-sdk-go/storage/errors" "github.com/coinbase/rosetta-sdk-go/types" "github.com/coinbase/rosetta-sdk-go/utils" + pErrors "github.com/pkg/errors" ) // New creates a new Reconciler. @@ -149,7 +150,7 @@ func (r *Reconciler) QueueChanges( }: return nil case <-ctx.Done(): - return ctx.Err() + return pErrors.WithStack(ctx.Err()) } } @@ -159,10 +160,10 @@ func (r *Reconciler) queueWorker(ctx context.Context) error { select { case req := <-r.processQueue: if err := r.queueChanges(ctx, req.Block, req.Changes); err != nil { - return err + return pErrors.WithStack(err) } case <-ctx.Done(): - return ctx.Err() + return pErrors.WithStack(ctx.Err()) } } } @@ -212,7 +213,7 @@ func (r *Reconciler) queueChanges( change.Currency, HeadBehind, ); err != nil { - return err + return pErrors.WithStack(err) } continue @@ -230,7 +231,7 @@ func (r *Reconciler) queueChanges( err := r.inactiveAccountQueue(false, acctCurrency, block, true) r.inactiveQueueMutex.Unlock() if err != nil { - return err + return pErrors.WithStack(err) } // Add change to queueMap before enqueuing to ensure @@ -282,11 +283,14 @@ func (r *Reconciler) CompareBalance( // Head block should be set before we CompareBalance head, err := r.helper.CurrentBlock(ctx, dbTx) if err != nil { - return zeroString, "", 0, fmt.Errorf( - "%w: %v", - ErrGetCurrentBlockFailed, - err, - ) + // FIXME: lin + // return zeroString, "", 0, fmt.Errorf( + // "%w: %v", + // ErrGetCurrentBlockFailed, + // err, + // ) + + return zeroString, "", 0, pErrors.Wrap(ErrGetCurrentBlockFailed, err.Error()) } // Check if live block is < head (or wait) @@ -302,17 +306,32 @@ func (r *Reconciler) CompareBalance( // Check if live block is in store (ensure not reorged) canonical, err := r.helper.CanonicalBlock(ctx, dbTx, liveBlock) if err != nil { - return zeroString, "", 0, fmt.Errorf( - "%w: %v: on live block %+v", + // FIXME: lin + // return zeroString, "", 0, fmt.Errorf( + // "%w: %v: on live block %+v", + // ErrBlockExistsFailed, + // err, + // liveBlock, + // ) + + return zeroString, "", 0, pErrors.Wrapf( ErrBlockExistsFailed, + "%v: on live block %+v", err, liveBlock, ) } if !canonical { - return zeroString, "", head.Index, fmt.Errorf( - "%w %+v", + // FIXME: lin + // return zeroString, "", head.Index, fmt.Errorf( + // "%w %+v", + // ErrBlockGone, + // liveBlock, + // ) + + return zeroString, "", head.Index, pErrors.Wrapf( ErrBlockGone, + "%+v", liveBlock, ) } @@ -326,18 +345,37 @@ func (r *Reconciler) CompareBalance( liveBlock.Index, ) if err != nil { - if errors.Is(err, storageErrors.ErrAccountMissing) { - return zeroString, "", head.Index, fmt.Errorf( - "%w for %+v:%+v", + // FIXME: lin + // if errors.Is(err, storageErrors.ErrAccountMissing) { + // return zeroString, "", head.Index, fmt.Errorf( + // "%w for %+v:%+v", + // storageErrors.ErrAccountMissing, + // account, + // currency, + // ) + // } + + if pErrors.Is(err, storageErrors.ErrAccountMissing) { + return zeroString, "", head.Index, pErrors.Wrapf( storageErrors.ErrAccountMissing, + "for %+v:%+v", account, currency, ) } - return zeroString, "", head.Index, fmt.Errorf( - "%w for %+v:%+v: %v", + // FIXME: lin + // return zeroString, "", head.Index, fmt.Errorf( + // "%w for %+v:%+v: %v", + // ErrGetComputedBalanceFailed, + // account, + // currency, + // err, + // ) + + return zeroString, "", head.Index, pErrors.Wrapf( ErrGetComputedBalanceFailed, + "for %+v:%+v: %v", account, currency, err, @@ -346,7 +384,7 @@ func (r *Reconciler) CompareBalance( difference, err := types.SubtractValues(liveBalance, computedBalance.Value) if err != nil { - return "", "", -1, err + return "", "", -1, pErrors.WithStack(err) } return difference, computedBalance.Value, head.Index, nil @@ -377,9 +415,18 @@ func (r *Reconciler) bestLiveBalance( lookupIndex, ) if err != nil { - return nil, nil, fmt.Errorf( - "%w: unable to get live balance for %s %s at %d", + // FIXME: lin + // return nil, nil, fmt.Errorf( + // "%w: unable to get live balance for %s %s at %d", + // err, + // types.PrintStruct(account), + // types.PrintStruct(currency), + // lookupIndex, + // ) + + return nil, nil, pErrors.Wrapf( err, + "unable to get live balance for %s %s at %d", types.PrintStruct(account), types.PrintStruct(currency), lookupIndex, @@ -411,7 +458,7 @@ func (r *Reconciler) handleBalanceMismatch( if exemption != nil { // Return handler result (regardless if error) so that we don't invoke the handler for // a failed reconciliation as well. - return r.handler.ReconciliationExempt( + return pErrors.WithStack(r.handler.ReconciliationExempt( ctx, reconciliationType, account, @@ -420,7 +467,7 @@ func (r *Reconciler) handleBalanceMismatch( liveBalance, block, exemption, - ) + )) } // If we didn't find a matching exemption, @@ -436,7 +483,7 @@ func (r *Reconciler) handleBalanceMismatch( block, ) if err != nil { // error only returned if we should exit on failure - return err + return pErrors.WithStack(err) } return nil @@ -472,7 +519,9 @@ func (r *Reconciler) accountReconciliation( liveBlock, ) if err != nil { - if errors.Is(err, ErrHeadBlockBehindLive) { + // FIXME: lin + // if errors.Is(err, ErrHeadBlockBehindLive) { + if pErrors.Is(err, ErrHeadBlockBehindLive) { // This error will only occur when lookupBalanceByBlock // is disabled and the syncer is behind the current block of // the node. This error should never occur when @@ -495,16 +544,18 @@ func (r *Reconciler) accountReconciliation( // after this new highWaterMark. r.highWaterMark = liveBlock.Index - return r.handler.ReconciliationSkipped( + return pErrors.WithStack(r.handler.ReconciliationSkipped( ctx, reconciliationType, account, currency, HeadBehind, - ) + )) } - if errors.Is(err, ErrBlockGone) { + // FIXME: lin + // if errors.Is(err, ErrBlockGone) { + if pErrors.Is(err, ErrBlockGone) { // Either the block has not been processed in a re-org yet // or the block was orphaned r.debugLog( @@ -512,16 +563,18 @@ func (r *Reconciler) accountReconciliation( types.PrintStruct(liveBlock), ) - return r.handler.ReconciliationSkipped( + return pErrors.WithStack(r.handler.ReconciliationSkipped( ctx, reconciliationType, account, currency, BlockGone, - ) + )) } - if errors.Is(err, storageErrors.ErrAccountMissing) { + // FIXME: lin + // if errors.Is(err, storageErrors.ErrAccountMissing) { + if pErrors.Is(err, storageErrors.ErrAccountMissing) { // When interesting accounts are specified, // we try to reconcile balances for each of these // accounts at each block height. @@ -547,7 +600,7 @@ func (r *Reconciler) accountReconciliation( } if difference != zeroString { - return r.handleBalanceMismatch( + return pErrors.WithStack(r.handleBalanceMismatch( ctx, difference, reconciliationType, @@ -556,20 +609,20 @@ func (r *Reconciler) accountReconciliation( computedBalance, liveAmount, liveBlock, - ) + )) } - return r.handler.ReconciliationSucceeded( + return pErrors.WithStack(r.handler.ReconciliationSucceeded( ctx, reconciliationType, accountCurrency.Account, accountCurrency.Currency, liveAmount, liveBlock, - ) + )) } - return ctx.Err() + return pErrors.WithStack(ctx.Err()) } func (r *Reconciler) inactiveAccountQueue( @@ -627,12 +680,12 @@ func (r *Reconciler) pruneBalances( return nil } - return r.helper.PruneBalances( + return pErrors.WithStack(r.helper.PruneBalances( ctx, acctCurrency.Account, acctCurrency.Currency, index-safeBalancePruneDepth, - ) + )) } // skipAndPrune calls the ReconciliationSkipped @@ -649,10 +702,10 @@ func (r *Reconciler) skipAndPrune( change.Currency, skipCause, ); err != nil { - return err + return pErrors.WithStack(err) } - return r.updateQueueMap( + return pErrors.WithStack(r.updateQueueMap( ctx, &types.AccountCurrency{ Account: change.Account, @@ -660,7 +713,7 @@ func (r *Reconciler) skipAndPrune( }, change.Block.Index, pruneActiveReconciliation, - ) + )) } // updateQueueMap removes a *parser.BalanceChange @@ -708,7 +761,7 @@ func (r *Reconciler) updateQueueMap( return nil } - return r.pruneBalances(ctx, acctCurrency, index) + return pErrors.WithStack(r.pruneBalances(ctx, acctCurrency, index)) } // reconcileActiveAccounts selects an account @@ -720,7 +773,7 @@ func (r *Reconciler) reconcileActiveAccounts(ctx context.Context) error { // nol for { select { case <-ctx.Done(): - return ctx.Err() + return pErrors.WithStack(ctx.Err()) case balanceChange := <-r.changeQueue: if balanceChange.Block.Index < r.highWaterMark { r.debugLog( @@ -728,7 +781,7 @@ func (r *Reconciler) reconcileActiveAccounts(ctx context.Context) error { // nol ) if err := r.skipAndPrune(ctx, balanceChange, HeadBehind); err != nil { - return err + return pErrors.WithStack(err) } continue @@ -745,14 +798,14 @@ func (r *Reconciler) reconcileActiveAccounts(ctx context.Context) error { // nol // context is canceled. if errors.Is(err, context.Canceled) { r.wrappedActiveEnqueue(ctx, balanceChange) - return err + return pErrors.WithStack(err) } tip, tErr := r.helper.IndexAtTip(ctx, balanceChange.Block.Index) switch { case tErr == nil && tip: if err := r.skipAndPrune(ctx, balanceChange, TipFailure); err != nil { - return err + return pErrors.WithStack(err) } continue @@ -760,7 +813,9 @@ func (r *Reconciler) reconcileActiveAccounts(ctx context.Context) error { // nol fmt.Printf("%v: could not determine if at tip\n", tErr) } - return fmt.Errorf("%w: %v", ErrLiveBalanceLookupFailed, err) + // FIXME: lin + // return fmt.Errorf("%w: %v", ErrLiveBalanceLookupFailed, err) + return pErrors.Wrapf(ErrLiveBalanceLookupFailed, "%v", err) } err = r.accountReconciliation( @@ -774,11 +829,13 @@ func (r *Reconciler) reconcileActiveAccounts(ctx context.Context) error { // nol if err != nil { // Ensure we don't leak reconciliations if // context is canceled. - if errors.Is(err, context.Canceled) { + // FIXME: lin + // if errors.Is(err, context.Canceled) { + if pErrors.Is(err, context.Canceled) { r.wrappedActiveEnqueue(ctx, balanceChange) } - return err + return pErrors.WithStack(err) } // Attempt to prune historical balances that will not be used @@ -792,7 +849,7 @@ func (r *Reconciler) reconcileActiveAccounts(ctx context.Context) error { // nol balanceChange.Block.Index, pruneActiveReconciliation, ); err != nil { - return err + return pErrors.WithStack(err) } r.updateLastChecked(balanceChange.Block.Index) @@ -893,8 +950,10 @@ func (r *Reconciler) reconcileInactiveAccounts( // nolint:gocognit if err != nil { // Ensure we don't leak reconciliations r.wrappedInactiveEnqueue(nextAcct.Entry, block) - if errors.Is(err, context.Canceled) { - return err + // FIXME: lin + // if errors.Is(err, context.Canceled) { + if pErrors.Is(err, context.Canceled) { + return pErrors.WithStack(err) } tip, tErr := r.helper.IndexAtTip(ctx, head.Index) @@ -907,7 +966,7 @@ func (r *Reconciler) reconcileInactiveAccounts( // nolint:gocognit nextAcct.Entry.Currency, TipFailure, ); err != nil { - return err + return pErrors.WithStack(err) } if err := r.updateQueueMap( @@ -916,7 +975,7 @@ func (r *Reconciler) reconcileInactiveAccounts( // nolint:gocognit head.Index, pruneInactiveReconciliation, ); err != nil { - return err + return pErrors.WithStack(err) } continue @@ -924,7 +983,9 @@ func (r *Reconciler) reconcileInactiveAccounts( // nolint:gocognit fmt.Printf("%v: could not determine if at tip\n", tErr) } - return fmt.Errorf("%w: %v", ErrLiveBalanceLookupFailed, err) + // FIXME: lin + // return fmt.Errorf("%w: %v", ErrLiveBalanceLookupFailed, err) + return pErrors.Wrapf(ErrLiveBalanceLookupFailed, "%v", err) } err = r.accountReconciliation( @@ -937,7 +998,7 @@ func (r *Reconciler) reconcileInactiveAccounts( // nolint:gocognit ) if err != nil { r.wrappedInactiveEnqueue(nextAcct.Entry, block) - return err + return pErrors.WithStack(err) } // We always prune relative to the index we inserted @@ -950,7 +1011,7 @@ func (r *Reconciler) reconcileInactiveAccounts( // nolint:gocognit head.Index, pruneInactiveReconciliation, ); err != nil { - return err + return pErrors.WithStack(err) } // Always re-enqueue accounts after they have been inactively @@ -958,7 +1019,7 @@ func (r *Reconciler) reconcileInactiveAccounts( // nolint:gocognit // these accounts again. err = r.inactiveAccountQueue(true, nextAcct.Entry, block, false) if err != nil { - return err + return pErrors.WithStack(err) } } else { r.inactiveQueueMutex.Unlock() @@ -972,7 +1033,7 @@ func (r *Reconciler) reconcileInactiveAccounts( // nolint:gocognit } } - return ctx.Err() + return pErrors.WithStack(ctx.Err()) } // Reconcile starts the active and inactive Reconciler goroutines. @@ -980,23 +1041,23 @@ func (r *Reconciler) reconcileInactiveAccounts( // nolint:gocognit func (r *Reconciler) Reconcile(ctx context.Context) error { g, ctx := errgroup.WithContext(ctx) g.Go(func() error { - return r.queueWorker(ctx) + return pErrors.WithStack(r.queueWorker(ctx)) }) for j := 0; j < r.ActiveConcurrency; j++ { g.Go(func() error { - return r.reconcileActiveAccounts(ctx) + return pErrors.WithStack(r.reconcileActiveAccounts(ctx)) }) } for j := 0; j < r.InactiveConcurrency; j++ { g.Go(func() error { - return r.reconcileInactiveAccounts(ctx) + return pErrors.WithStack(r.reconcileInactiveAccounts(ctx)) }) } if err := g.Wait(); err != nil { - return err + return pErrors.WithStack(err) } return nil From 36ef8c8da9af46430f834f1b414b3fd36871a802 Mon Sep 17 00:00:00 2001 From: "luke.lin" Date: Sat, 23 Jan 2021 00:55:46 -0800 Subject: [PATCH 2/4] Clean up fixme --- reconciler/reconciler.go | 163 ++++++++++++--------------------------- 1 file changed, 49 insertions(+), 114 deletions(-) diff --git a/reconciler/reconciler.go b/reconciler/reconciler.go index 236617ba..ec31524b 100644 --- a/reconciler/reconciler.go +++ b/reconciler/reconciler.go @@ -16,18 +16,17 @@ package reconciler import ( "context" - "errors" "fmt" "log" "time" + "github.com/pkg/errors" "golang.org/x/sync/errgroup" "github.com/coinbase/rosetta-sdk-go/parser" storageErrors "github.com/coinbase/rosetta-sdk-go/storage/errors" "github.com/coinbase/rosetta-sdk-go/types" "github.com/coinbase/rosetta-sdk-go/utils" - pErrors "github.com/pkg/errors" ) // New creates a new Reconciler. @@ -150,7 +149,7 @@ func (r *Reconciler) QueueChanges( }: return nil case <-ctx.Done(): - return pErrors.WithStack(ctx.Err()) + return errors.WithStack(ctx.Err()) } } @@ -160,10 +159,10 @@ func (r *Reconciler) queueWorker(ctx context.Context) error { select { case req := <-r.processQueue: if err := r.queueChanges(ctx, req.Block, req.Changes); err != nil { - return pErrors.WithStack(err) + return errors.WithStack(err) } case <-ctx.Done(): - return pErrors.WithStack(ctx.Err()) + return errors.WithStack(ctx.Err()) } } } @@ -213,7 +212,7 @@ func (r *Reconciler) queueChanges( change.Currency, HeadBehind, ); err != nil { - return pErrors.WithStack(err) + return errors.WithStack(err) } continue @@ -231,7 +230,7 @@ func (r *Reconciler) queueChanges( err := r.inactiveAccountQueue(false, acctCurrency, block, true) r.inactiveQueueMutex.Unlock() if err != nil { - return pErrors.WithStack(err) + return errors.WithStack(err) } // Add change to queueMap before enqueuing to ensure @@ -283,14 +282,7 @@ func (r *Reconciler) CompareBalance( // Head block should be set before we CompareBalance head, err := r.helper.CurrentBlock(ctx, dbTx) if err != nil { - // FIXME: lin - // return zeroString, "", 0, fmt.Errorf( - // "%w: %v", - // ErrGetCurrentBlockFailed, - // err, - // ) - - return zeroString, "", 0, pErrors.Wrap(ErrGetCurrentBlockFailed, err.Error()) + return zeroString, "", 0, errors.Wrap(ErrGetCurrentBlockFailed, err.Error()) } // Check if live block is < head (or wait) @@ -306,15 +298,7 @@ func (r *Reconciler) CompareBalance( // Check if live block is in store (ensure not reorged) canonical, err := r.helper.CanonicalBlock(ctx, dbTx, liveBlock) if err != nil { - // FIXME: lin - // return zeroString, "", 0, fmt.Errorf( - // "%w: %v: on live block %+v", - // ErrBlockExistsFailed, - // err, - // liveBlock, - // ) - - return zeroString, "", 0, pErrors.Wrapf( + return zeroString, "", 0, errors.Wrapf( ErrBlockExistsFailed, "%v: on live block %+v", err, @@ -322,14 +306,7 @@ func (r *Reconciler) CompareBalance( ) } if !canonical { - // FIXME: lin - // return zeroString, "", head.Index, fmt.Errorf( - // "%w %+v", - // ErrBlockGone, - // liveBlock, - // ) - - return zeroString, "", head.Index, pErrors.Wrapf( + return zeroString, "", head.Index, errors.Wrapf( ErrBlockGone, "%+v", liveBlock, @@ -345,18 +322,8 @@ func (r *Reconciler) CompareBalance( liveBlock.Index, ) if err != nil { - // FIXME: lin - // if errors.Is(err, storageErrors.ErrAccountMissing) { - // return zeroString, "", head.Index, fmt.Errorf( - // "%w for %+v:%+v", - // storageErrors.ErrAccountMissing, - // account, - // currency, - // ) - // } - - if pErrors.Is(err, storageErrors.ErrAccountMissing) { - return zeroString, "", head.Index, pErrors.Wrapf( + if errors.Is(err, storageErrors.ErrAccountMissing) { + return zeroString, "", head.Index, errors.Wrapf( storageErrors.ErrAccountMissing, "for %+v:%+v", account, @@ -364,16 +331,7 @@ func (r *Reconciler) CompareBalance( ) } - // FIXME: lin - // return zeroString, "", head.Index, fmt.Errorf( - // "%w for %+v:%+v: %v", - // ErrGetComputedBalanceFailed, - // account, - // currency, - // err, - // ) - - return zeroString, "", head.Index, pErrors.Wrapf( + return zeroString, "", head.Index, errors.Wrapf( ErrGetComputedBalanceFailed, "for %+v:%+v: %v", account, @@ -384,7 +342,7 @@ func (r *Reconciler) CompareBalance( difference, err := types.SubtractValues(liveBalance, computedBalance.Value) if err != nil { - return "", "", -1, pErrors.WithStack(err) + return "", "", -1, errors.WithStack(err) } return difference, computedBalance.Value, head.Index, nil @@ -415,16 +373,7 @@ func (r *Reconciler) bestLiveBalance( lookupIndex, ) if err != nil { - // FIXME: lin - // return nil, nil, fmt.Errorf( - // "%w: unable to get live balance for %s %s at %d", - // err, - // types.PrintStruct(account), - // types.PrintStruct(currency), - // lookupIndex, - // ) - - return nil, nil, pErrors.Wrapf( + return nil, nil, errors.Wrapf( err, "unable to get live balance for %s %s at %d", types.PrintStruct(account), @@ -458,7 +407,7 @@ func (r *Reconciler) handleBalanceMismatch( if exemption != nil { // Return handler result (regardless if error) so that we don't invoke the handler for // a failed reconciliation as well. - return pErrors.WithStack(r.handler.ReconciliationExempt( + return errors.WithStack(r.handler.ReconciliationExempt( ctx, reconciliationType, account, @@ -483,7 +432,7 @@ func (r *Reconciler) handleBalanceMismatch( block, ) if err != nil { // error only returned if we should exit on failure - return pErrors.WithStack(err) + return errors.WithStack(err) } return nil @@ -519,9 +468,7 @@ func (r *Reconciler) accountReconciliation( liveBlock, ) if err != nil { - // FIXME: lin - // if errors.Is(err, ErrHeadBlockBehindLive) { - if pErrors.Is(err, ErrHeadBlockBehindLive) { + if errors.Is(err, ErrHeadBlockBehindLive) { // This error will only occur when lookupBalanceByBlock // is disabled and the syncer is behind the current block of // the node. This error should never occur when @@ -544,7 +491,7 @@ func (r *Reconciler) accountReconciliation( // after this new highWaterMark. r.highWaterMark = liveBlock.Index - return pErrors.WithStack(r.handler.ReconciliationSkipped( + return errors.WithStack(r.handler.ReconciliationSkipped( ctx, reconciliationType, account, @@ -553,9 +500,7 @@ func (r *Reconciler) accountReconciliation( )) } - // FIXME: lin - // if errors.Is(err, ErrBlockGone) { - if pErrors.Is(err, ErrBlockGone) { + if errors.Is(err, ErrBlockGone) { // Either the block has not been processed in a re-org yet // or the block was orphaned r.debugLog( @@ -563,7 +508,7 @@ func (r *Reconciler) accountReconciliation( types.PrintStruct(liveBlock), ) - return pErrors.WithStack(r.handler.ReconciliationSkipped( + return errors.WithStack(r.handler.ReconciliationSkipped( ctx, reconciliationType, account, @@ -572,9 +517,7 @@ func (r *Reconciler) accountReconciliation( )) } - // FIXME: lin - // if errors.Is(err, storageErrors.ErrAccountMissing) { - if pErrors.Is(err, storageErrors.ErrAccountMissing) { + if errors.Is(err, storageErrors.ErrAccountMissing) { // When interesting accounts are specified, // we try to reconcile balances for each of these // accounts at each block height. @@ -600,7 +543,7 @@ func (r *Reconciler) accountReconciliation( } if difference != zeroString { - return pErrors.WithStack(r.handleBalanceMismatch( + return errors.WithStack(r.handleBalanceMismatch( ctx, difference, reconciliationType, @@ -612,7 +555,7 @@ func (r *Reconciler) accountReconciliation( )) } - return pErrors.WithStack(r.handler.ReconciliationSucceeded( + return errors.WithStack(r.handler.ReconciliationSucceeded( ctx, reconciliationType, accountCurrency.Account, @@ -622,7 +565,7 @@ func (r *Reconciler) accountReconciliation( )) } - return pErrors.WithStack(ctx.Err()) + return errors.WithStack(ctx.Err()) } func (r *Reconciler) inactiveAccountQueue( @@ -680,7 +623,7 @@ func (r *Reconciler) pruneBalances( return nil } - return pErrors.WithStack(r.helper.PruneBalances( + return errors.WithStack(r.helper.PruneBalances( ctx, acctCurrency.Account, acctCurrency.Currency, @@ -702,10 +645,10 @@ func (r *Reconciler) skipAndPrune( change.Currency, skipCause, ); err != nil { - return pErrors.WithStack(err) + return errors.WithStack(err) } - return pErrors.WithStack(r.updateQueueMap( + return errors.WithStack(r.updateQueueMap( ctx, &types.AccountCurrency{ Account: change.Account, @@ -761,7 +704,7 @@ func (r *Reconciler) updateQueueMap( return nil } - return pErrors.WithStack(r.pruneBalances(ctx, acctCurrency, index)) + return errors.WithStack(r.pruneBalances(ctx, acctCurrency, index)) } // reconcileActiveAccounts selects an account @@ -773,7 +716,7 @@ func (r *Reconciler) reconcileActiveAccounts(ctx context.Context) error { // nol for { select { case <-ctx.Done(): - return pErrors.WithStack(ctx.Err()) + return errors.WithStack(ctx.Err()) case balanceChange := <-r.changeQueue: if balanceChange.Block.Index < r.highWaterMark { r.debugLog( @@ -781,7 +724,7 @@ func (r *Reconciler) reconcileActiveAccounts(ctx context.Context) error { // nol ) if err := r.skipAndPrune(ctx, balanceChange, HeadBehind); err != nil { - return pErrors.WithStack(err) + return errors.WithStack(err) } continue @@ -798,14 +741,14 @@ func (r *Reconciler) reconcileActiveAccounts(ctx context.Context) error { // nol // context is canceled. if errors.Is(err, context.Canceled) { r.wrappedActiveEnqueue(ctx, balanceChange) - return pErrors.WithStack(err) + return errors.WithStack(err) } tip, tErr := r.helper.IndexAtTip(ctx, balanceChange.Block.Index) switch { case tErr == nil && tip: if err := r.skipAndPrune(ctx, balanceChange, TipFailure); err != nil { - return pErrors.WithStack(err) + return errors.WithStack(err) } continue @@ -813,9 +756,7 @@ func (r *Reconciler) reconcileActiveAccounts(ctx context.Context) error { // nol fmt.Printf("%v: could not determine if at tip\n", tErr) } - // FIXME: lin - // return fmt.Errorf("%w: %v", ErrLiveBalanceLookupFailed, err) - return pErrors.Wrapf(ErrLiveBalanceLookupFailed, "%v", err) + return errors.Wrapf(ErrLiveBalanceLookupFailed, "%v", err) } err = r.accountReconciliation( @@ -829,13 +770,11 @@ func (r *Reconciler) reconcileActiveAccounts(ctx context.Context) error { // nol if err != nil { // Ensure we don't leak reconciliations if // context is canceled. - // FIXME: lin - // if errors.Is(err, context.Canceled) { - if pErrors.Is(err, context.Canceled) { + if errors.Is(err, context.Canceled) { r.wrappedActiveEnqueue(ctx, balanceChange) } - return pErrors.WithStack(err) + return errors.WithStack(err) } // Attempt to prune historical balances that will not be used @@ -849,7 +788,7 @@ func (r *Reconciler) reconcileActiveAccounts(ctx context.Context) error { // nol balanceChange.Block.Index, pruneActiveReconciliation, ); err != nil { - return pErrors.WithStack(err) + return errors.WithStack(err) } r.updateLastChecked(balanceChange.Block.Index) @@ -950,10 +889,8 @@ func (r *Reconciler) reconcileInactiveAccounts( // nolint:gocognit if err != nil { // Ensure we don't leak reconciliations r.wrappedInactiveEnqueue(nextAcct.Entry, block) - // FIXME: lin - // if errors.Is(err, context.Canceled) { - if pErrors.Is(err, context.Canceled) { - return pErrors.WithStack(err) + if errors.Is(err, context.Canceled) { + return errors.WithStack(err) } tip, tErr := r.helper.IndexAtTip(ctx, head.Index) @@ -966,7 +903,7 @@ func (r *Reconciler) reconcileInactiveAccounts( // nolint:gocognit nextAcct.Entry.Currency, TipFailure, ); err != nil { - return pErrors.WithStack(err) + return errors.WithStack(err) } if err := r.updateQueueMap( @@ -975,7 +912,7 @@ func (r *Reconciler) reconcileInactiveAccounts( // nolint:gocognit head.Index, pruneInactiveReconciliation, ); err != nil { - return pErrors.WithStack(err) + return errors.WithStack(err) } continue @@ -983,9 +920,7 @@ func (r *Reconciler) reconcileInactiveAccounts( // nolint:gocognit fmt.Printf("%v: could not determine if at tip\n", tErr) } - // FIXME: lin - // return fmt.Errorf("%w: %v", ErrLiveBalanceLookupFailed, err) - return pErrors.Wrapf(ErrLiveBalanceLookupFailed, "%v", err) + return errors.Wrapf(ErrLiveBalanceLookupFailed, "%v", err) } err = r.accountReconciliation( @@ -998,7 +933,7 @@ func (r *Reconciler) reconcileInactiveAccounts( // nolint:gocognit ) if err != nil { r.wrappedInactiveEnqueue(nextAcct.Entry, block) - return pErrors.WithStack(err) + return errors.WithStack(err) } // We always prune relative to the index we inserted @@ -1011,7 +946,7 @@ func (r *Reconciler) reconcileInactiveAccounts( // nolint:gocognit head.Index, pruneInactiveReconciliation, ); err != nil { - return pErrors.WithStack(err) + return errors.WithStack(err) } // Always re-enqueue accounts after they have been inactively @@ -1019,7 +954,7 @@ func (r *Reconciler) reconcileInactiveAccounts( // nolint:gocognit // these accounts again. err = r.inactiveAccountQueue(true, nextAcct.Entry, block, false) if err != nil { - return pErrors.WithStack(err) + return errors.WithStack(err) } } else { r.inactiveQueueMutex.Unlock() @@ -1033,7 +968,7 @@ func (r *Reconciler) reconcileInactiveAccounts( // nolint:gocognit } } - return pErrors.WithStack(ctx.Err()) + return errors.WithStack(ctx.Err()) } // Reconcile starts the active and inactive Reconciler goroutines. @@ -1041,23 +976,23 @@ func (r *Reconciler) reconcileInactiveAccounts( // nolint:gocognit func (r *Reconciler) Reconcile(ctx context.Context) error { g, ctx := errgroup.WithContext(ctx) g.Go(func() error { - return pErrors.WithStack(r.queueWorker(ctx)) + return errors.WithStack(r.queueWorker(ctx)) }) for j := 0; j < r.ActiveConcurrency; j++ { g.Go(func() error { - return pErrors.WithStack(r.reconcileActiveAccounts(ctx)) + return errors.WithStack(r.reconcileActiveAccounts(ctx)) }) } for j := 0; j < r.InactiveConcurrency; j++ { g.Go(func() error { - return pErrors.WithStack(r.reconcileInactiveAccounts(ctx)) + return errors.WithStack(r.reconcileInactiveAccounts(ctx)) }) } if err := g.Wait(); err != nil { - return pErrors.WithStack(err) + return errors.WithStack(err) } return nil From a4ceacf2019ca0f3a76a91f1d698484040afead7 Mon Sep 17 00:00:00 2001 From: "luke.lin" Date: Wed, 27 Jan 2021 16:02:08 -0800 Subject: [PATCH 3/4] add no lint as a workaround --- reconciler/reconciler.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/reconciler/reconciler.go b/reconciler/reconciler.go index ec31524b..7a22d6f4 100644 --- a/reconciler/reconciler.go +++ b/reconciler/reconciler.go @@ -573,7 +573,7 @@ func (r *Reconciler) inactiveAccountQueue( accountCurrency *types.AccountCurrency, liveBlock *types.BlockIdentifier, hasLock bool, -) error { +) error { // nolint if !hasLock { r.inactiveQueueMutex.Lock(false) defer r.inactiveQueueMutex.Unlock() From 7c38fba08d4d113d0009e7c0b8847019d132c739 Mon Sep 17 00:00:00 2001 From: "luke.lin" Date: Thu, 28 Jan 2021 15:57:05 -0800 Subject: [PATCH 4/4] nit to retain the same error format --- reconciler/reconciler.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/reconciler/reconciler.go b/reconciler/reconciler.go index 7a22d6f4..a4ff5fb5 100644 --- a/reconciler/reconciler.go +++ b/reconciler/reconciler.go @@ -282,7 +282,7 @@ func (r *Reconciler) CompareBalance( // Head block should be set before we CompareBalance head, err := r.helper.CurrentBlock(ctx, dbTx) if err != nil { - return zeroString, "", 0, errors.Wrap(ErrGetCurrentBlockFailed, err.Error()) + return zeroString, "", 0, errors.Wrapf(ErrGetCurrentBlockFailed, "%v", err) } // Check if live block is < head (or wait)