Skip to content

Commit

Permalink
fix(flush): do not short circuit if ack is not available (#23)
Browse files Browse the repository at this point in the history
  • Loading branch information
danwt authored and omritoptix committed May 8, 2024
1 parent 2312bc8 commit ff1b8ca
Show file tree
Hide file tree
Showing 9 changed files with 155 additions and 70 deletions.
9 changes: 9 additions & 0 deletions dymutils/gerr/types.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package gerr

import "errors"

/*
TODO: this should use the dymint gerr package
*/

var ErrNotFound = errors.New("not found")
17 changes: 10 additions & 7 deletions relayer/chains/cosmos/cosmos_chain_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func NewCosmosChainProcessor(
metrics *processor.PrometheusMetrics,
) *CosmosChainProcessor {
return &CosmosChainProcessor{
log: log.With(zap.String("chain_name", provider.ChainName()), zap.String("chain_id", provider.ChainId())),
log: log.With(zap.String("chain_id", provider.ChainId())),
chainProvider: provider,
latestClientState: make(latestClientState),
connectionStateCache: make(processor.ConnectionStateCache),
Expand Down Expand Up @@ -493,8 +493,16 @@ func (ccp *CosmosChainProcessor) queryCycle(
messages := chains.IbcMessagesFromEvents(ccp.log, tx.Events, chainID, heightUint64)

for _, m := range messages {
if stuckPacket != nil && ccp.chainProvider.ChainId() == stuckPacket.ChainID && int64(stuckPacket.StartHeight) <= i && i <= int64(stuckPacket.EndHeight) {
switch t := m.Info.(type) {
case *chains.PacketInfo:
ccp.log.Info("found stuck packet message", zap.Any("seq", t.Sequence), zap.Any("height", t.Height))
}
ccp.log.Debug("found stuck message (all data)", zap.Any("msg", m))
}
ccp.handleMessage(ctx, m, ibcMessagesCache)
}

}

newLatestQueriedBlock = i
Expand All @@ -506,8 +514,7 @@ func (ccp *CosmosChainProcessor) queryCycle(
i = persistence.latestHeight

newLatestQueriedBlock = afterUnstuck
// newLatestQueriedBlock = persistence.latestHeight // this line fixes it, but why?
ccp.log.Info("Parsed stuck packet height, skipping to current", zap.Any("new latest queried block", persistence.latestHeight))
ccp.log.Info("Parsed stuck packet height, skipping to current", zap.Any("new latest queried block", newLatestQueriedBlock))
}

if i%100 == 0 {
Expand Down Expand Up @@ -541,10 +548,6 @@ func (ccp *CosmosChainProcessor) queryCycle(
continue
}

if stuckPacket != nil && ccp.chainProvider.ChainId() == stuckPacket.ChainID {
ccp.log.Info("sending new data to the path processor", zap.Bool("inSync", ccp.inSync))
}

pp.HandleNewData(chainID, processor.ChainProcessorCacheData{
LatestBlock: ccp.latestBlock,
LatestHeader: latestHeader,
Expand Down
9 changes: 4 additions & 5 deletions relayer/chains/cosmos/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,10 @@ import (
host "github.com/cosmos/ibc-go/v8/modules/core/24-host"
ibcexported "github.com/cosmos/ibc-go/v8/modules/core/exported"
tmclient "github.com/cosmos/ibc-go/v8/modules/light-clients/07-tendermint"
"github.com/cosmos/relayer/v2/dymutils/gerr"
"github.com/cosmos/relayer/v2/relayer/chains"
"github.com/cosmos/relayer/v2/relayer/provider"

"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc/metadata"
Expand Down Expand Up @@ -321,7 +323,6 @@ func (cc *CosmosProvider) queryParamsSubspaceTime(ctx context.Context, subspace
params := proposal.QueryParamsRequest{Subspace: subspace, Key: key}

res, err := queryClient.Params(ctx, &params)

if err != nil {
return 0, fmt.Errorf("failed to make %s params request: %w", subspace, err)
}
Expand All @@ -340,14 +341,13 @@ func (cc *CosmosProvider) queryParamsSubspaceTime(ctx context.Context, subspace

// QueryUnbondingPeriod returns the unbonding period of the chain
func (cc *CosmosProvider) QueryUnbondingPeriod(ctx context.Context) (time.Duration, error) {

// Attempt ICS query
consumerUnbondingPeriod, consumerErr := cc.queryParamsSubspaceTime(ctx, "ccvconsumer", "UnbondingPeriod")
if consumerErr == nil {
return consumerUnbondingPeriod, nil
}

//Attempt Staking query.
// Attempt Staking query.
unbondingPeriod, stakingParamsErr := cc.queryParamsSubspaceTime(ctx, "staking", "UnbondingTime")
if stakingParamsErr == nil {
return unbondingPeriod, nil
Expand All @@ -359,7 +359,6 @@ func (cc *CosmosProvider) QueryUnbondingPeriod(ctx context.Context) (time.Durati
res, err := queryClient.Params(ctx, &req)
if err == nil {
return res.Params.UnbondingTime, nil

}

return 0,
Expand Down Expand Up @@ -1067,7 +1066,7 @@ func (cc *CosmosProvider) QueryRecvPacket(
}
}

return provider.PacketInfo{}, fmt.Errorf("no ibc messages found for write_acknowledgement query: %s", q)
return provider.PacketInfo{}, fmt.Errorf("no ibc messages found for write_acknowledgement query: %s: %w", q, gerr.ErrNotFound)
}

// QueryUnreceivedAcknowledgements returns a list of unrelayed packet acks
Expand Down
30 changes: 19 additions & 11 deletions relayer/chains/cosmos/tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,14 @@ func (cc *CosmosProvider) SendMessagesToMempool(
asyncCtx context.Context,
asyncCallbacks []func(*provider.RelayerTxResponse, error),
) error {
{
types := []string{}
for _, msg := range msgs {
types = append(types, msg.Type())
}
cc.log.Debug("Sending messages to mempool", zap.Any("types", types), zap.Any("chain", cc.PCfg.ChainID))
}

txSignerKey, feegranterKeyOrAddr, err := cc.buildSignerConfig(msgs)
if err != nil {
return err
Expand Down Expand Up @@ -190,6 +198,8 @@ func (cc *CosmosProvider) SendMessagesToMempool(
return err
}

cc.log.Debug("Transaction successfully sent to mempool", zap.String("chain", cc.PCfg.ChainID))

// we had a successful tx broadcast with this sequence, so update it to the next
cc.updateNextAccountSequence(sequenceGuard, sequence+1)
return nil
Expand Down Expand Up @@ -265,7 +275,6 @@ func (cc *CosmosProvider) SendMsgsWith(ctx context.Context, msgs []sdk.Msg, memo
// TODO: This is related to GRPC client stuff?
// https://github.com/cosmos/cosmos-sdk/blob/5725659684fc93790a63981c653feee33ecf3225/client/tx/tx.go#L297
_, adjusted, err = cc.CalculateGas(ctx, txf, signingKey, msgs...)

if err != nil {
return nil, err
}
Expand Down Expand Up @@ -298,16 +307,15 @@ func (cc *CosmosProvider) SendMsgsWith(ctx context.Context, msgs []sdk.Msg, memo
}

err = func() error {
//done := cc.SetSDKContext()
// done := cc.SetSDKContext()
// ensure that we allways call done, even in case of an error or panic
//defer done()
// defer done()

if err = tx.Sign(ctx, txf, signingKey, txb, false); err != nil {
return err
}
return nil
}()

if err != nil {
return nil, err
}
Expand Down Expand Up @@ -387,7 +395,6 @@ func (cc *CosmosProvider) broadcastTx(
address, err := cc.Address()
if err != nil {
return fmt.Errorf("failed to get relayer bech32 wallet address: %w", err)

}
cc.UpdateFeesSpent(cc.ChainId(), cc.Key(), address, fees)

Expand All @@ -413,7 +420,7 @@ func (cc *CosmosProvider) waitForTx(
cc.log.Error("Failed to wait for block inclusion", zap.Error(err))
if len(callbacks) > 0 {
for _, cb := range callbacks {
//Call each callback in order since waitForTx is already invoked asyncronously
// Call each callback in order since waitForTx is already invoked asyncronously
cb(nil, err)
}
}
Expand Down Expand Up @@ -441,7 +448,7 @@ func (cc *CosmosProvider) waitForTx(
}
if len(callbacks) > 0 {
for _, cb := range callbacks {
//Call each callback in order since waitForTx is already invoked asyncronously
// Call each callback in order since waitForTx is already invoked asyncronously
cb(nil, err)
}
}
Expand All @@ -451,7 +458,7 @@ func (cc *CosmosProvider) waitForTx(

if len(callbacks) > 0 {
for _, cb := range callbacks {
//Call each callback in order since waitForTx is already invoked asyncronously
// Call each callback in order since waitForTx is already invoked asyncronously
cb(rlyResp, nil)
}
}
Expand Down Expand Up @@ -645,7 +652,6 @@ func (cc *CosmosProvider) buildMessages(

if gas == 0 {
_, adjusted, err = cc.CalculateGas(ctx, txf, txSignerKey, cMsgs...)

if err != nil {
return nil, 0, sdk.Coins{}, err
}
Expand Down Expand Up @@ -755,9 +761,11 @@ func (cc *CosmosProvider) MsgUpgradeClient(srcClientId string, consRes *clientty
return nil, err
}

msgUpgradeClient := &clienttypes.MsgUpgradeClient{ClientId: srcClientId, ClientState: clientRes.ClientState,
msgUpgradeClient := &clienttypes.MsgUpgradeClient{
ClientId: srcClientId, ClientState: clientRes.ClientState,
ConsensusState: consRes.ConsensusState, ProofUpgradeClient: consRes.GetProof(),
ProofUpgradeConsensusState: consRes.ConsensusState.Value, Signer: acc}
ProofUpgradeConsensusState: consRes.ConsensusState.Value, Signer: acc,
}

return NewCosmosMessage(msgUpgradeClient, func(signer string) {
msgUpgradeClient.Signer = signer
Expand Down
6 changes: 3 additions & 3 deletions relayer/processor/message_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,13 +246,13 @@ func (mp *messageProcessor) assembleMessage(
mp.trackMessage(msg.tracker(assembled), i)
wg.Done()
if err != nil {
dst.log.Error(fmt.Sprintf("Error assembling %s message", msg.msgType()),
dst.log.Error(fmt.Sprintf("Error assembling message: %s", msg.msgType()),
zap.Object("msg", msg),
zap.Error(err),
)
return
}
dst.log.Debug(fmt.Sprintf("Assembled %s message", msg.msgType()), zap.Object("msg", msg))
dst.log.Debug(fmt.Sprintf("Assembled message: %s", msg.msgType()), zap.Object("msg", msg))
}

// assembleMsgUpdateClient uses the ChainProvider from both pathEnds to assemble the client update header
Expand Down Expand Up @@ -509,7 +509,7 @@ func (mp *messageProcessor) sendBatchMessages(
mp.log.Debug("Redundant message(s)", errFields...)
return
}
mp.log.Error("Error sending messages", errFields...)
mp.log.Error("Sending messages from batch", errFields...)
return
}
dst.log.Debug("Message broadcast completed", fields...)
Expand Down
1 change: 0 additions & 1 deletion relayer/processor/path_end_runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -481,7 +481,6 @@ func (pathEnd *pathEndRuntime) mergeCacheData(
pathEnd.lastClientUpdateHeightMu.Unlock()

pathEnd.inSync = d.InSync
pathEnd.log.Debug("set in sync", zap.Bool("in_sync", pathEnd.inSync), zap.String("chain_id", pathEnd.info.ChainID))

pathEnd.latestHeader = d.LatestHeader
pathEnd.clientState = d.ClientState
Expand Down
9 changes: 9 additions & 0 deletions relayer/processor/path_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,7 @@ func (pp *PathProcessor) HandleNewData(chainID string, cacheData ChainProcessorC

func (pp *PathProcessor) handleFlush(ctx context.Context) {
flushTimer := pp.flushInterval
pp.log.Debug("Flushing PathProcessor (handleFlush)")
if err := pp.flush(ctx); err != nil {
pp.log.Warn("Flush not complete", zap.Error(err))
flushTimer = flushFailureRetry
Expand Down Expand Up @@ -415,6 +416,14 @@ func (pp *PathProcessor) Run(ctx context.Context, cancel func()) {
pp.handleFlush(ctx)
pp.initialFlushComplete = true
} else if pp.shouldTerminateForFlushComplete() {
pp.log.Debug("PathProcessor terminating due to flush completion. Blocking until finished. CTRL-C!")

/*
NOTE: it is possible that there are still outstanding broadcasts
This cancel will cancel them
In the future, we may want to wait for them to finish (<-pp.pathEnd1.finishedProcessing etc)
*/

cancel()
return
}
Expand Down
Loading

0 comments on commit ff1b8ca

Please sign in to comment.