diff --git a/dymutils/gerr/types.go b/dymutils/gerr/types.go new file mode 100644 index 000000000..65a2f5b81 --- /dev/null +++ b/dymutils/gerr/types.go @@ -0,0 +1,9 @@ +package gerr + +import "errors" + +/* +TODO: this should use the dymint gerr package +*/ + +var ErrNotFound = errors.New("not found") diff --git a/relayer/chains/cosmos/cosmos_chain_processor.go b/relayer/chains/cosmos/cosmos_chain_processor.go index 30b847dc9..6d5a30fc3 100644 --- a/relayer/chains/cosmos/cosmos_chain_processor.go +++ b/relayer/chains/cosmos/cosmos_chain_processor.go @@ -62,7 +62,7 @@ func NewCosmosChainProcessor( metrics *processor.PrometheusMetrics, ) *CosmosChainProcessor { return &CosmosChainProcessor{ - log: log.With(zap.String("chain_name", provider.ChainName()), zap.String("chain_id", provider.ChainId())), + log: log.With(zap.String("chain_id", provider.ChainId())), chainProvider: provider, latestClientState: make(latestClientState), connectionStateCache: make(processor.ConnectionStateCache), @@ -493,8 +493,16 @@ func (ccp *CosmosChainProcessor) queryCycle( messages := chains.IbcMessagesFromEvents(ccp.log, tx.Events, chainID, heightUint64) for _, m := range messages { + if stuckPacket != nil && ccp.chainProvider.ChainId() == stuckPacket.ChainID && int64(stuckPacket.StartHeight) <= i && i <= int64(stuckPacket.EndHeight) { + switch t := m.Info.(type) { + case *chains.PacketInfo: + ccp.log.Info("found stuck packet message", zap.Any("seq", t.Sequence), zap.Any("height", t.Height)) + } + ccp.log.Debug("found stuck message (all data)", zap.Any("msg", m)) + } ccp.handleMessage(ctx, m, ibcMessagesCache) } + } newLatestQueriedBlock = i @@ -506,8 +514,7 @@ func (ccp *CosmosChainProcessor) queryCycle( i = persistence.latestHeight newLatestQueriedBlock = afterUnstuck - // newLatestQueriedBlock = persistence.latestHeight // this line fixes it, but why? - ccp.log.Info("Parsed stuck packet height, skipping to current", zap.Any("new latest queried block", persistence.latestHeight)) + ccp.log.Info("Parsed stuck packet height, skipping to current", zap.Any("new latest queried block", newLatestQueriedBlock)) } if i%100 == 0 { @@ -541,10 +548,6 @@ func (ccp *CosmosChainProcessor) queryCycle( continue } - if stuckPacket != nil && ccp.chainProvider.ChainId() == stuckPacket.ChainID { - ccp.log.Info("sending new data to the path processor", zap.Bool("inSync", ccp.inSync)) - } - pp.HandleNewData(chainID, processor.ChainProcessorCacheData{ LatestBlock: ccp.latestBlock, LatestHeader: latestHeader, diff --git a/relayer/chains/cosmos/query.go b/relayer/chains/cosmos/query.go index 48886072f..15b15044e 100644 --- a/relayer/chains/cosmos/query.go +++ b/relayer/chains/cosmos/query.go @@ -32,8 +32,10 @@ import ( host "github.com/cosmos/ibc-go/v8/modules/core/24-host" ibcexported "github.com/cosmos/ibc-go/v8/modules/core/exported" tmclient "github.com/cosmos/ibc-go/v8/modules/light-clients/07-tendermint" + "github.com/cosmos/relayer/v2/dymutils/gerr" "github.com/cosmos/relayer/v2/relayer/chains" "github.com/cosmos/relayer/v2/relayer/provider" + "go.uber.org/zap" "golang.org/x/sync/errgroup" "google.golang.org/grpc/metadata" @@ -321,7 +323,6 @@ func (cc *CosmosProvider) queryParamsSubspaceTime(ctx context.Context, subspace params := proposal.QueryParamsRequest{Subspace: subspace, Key: key} res, err := queryClient.Params(ctx, ¶ms) - if err != nil { return 0, fmt.Errorf("failed to make %s params request: %w", subspace, err) } @@ -340,14 +341,13 @@ func (cc *CosmosProvider) queryParamsSubspaceTime(ctx context.Context, subspace // QueryUnbondingPeriod returns the unbonding period of the chain func (cc *CosmosProvider) QueryUnbondingPeriod(ctx context.Context) (time.Duration, error) { - // Attempt ICS query consumerUnbondingPeriod, consumerErr := cc.queryParamsSubspaceTime(ctx, "ccvconsumer", "UnbondingPeriod") if consumerErr == nil { return consumerUnbondingPeriod, nil } - //Attempt Staking query. + // Attempt Staking query. unbondingPeriod, stakingParamsErr := cc.queryParamsSubspaceTime(ctx, "staking", "UnbondingTime") if stakingParamsErr == nil { return unbondingPeriod, nil @@ -359,7 +359,6 @@ func (cc *CosmosProvider) QueryUnbondingPeriod(ctx context.Context) (time.Durati res, err := queryClient.Params(ctx, &req) if err == nil { return res.Params.UnbondingTime, nil - } return 0, @@ -1067,7 +1066,7 @@ func (cc *CosmosProvider) QueryRecvPacket( } } - return provider.PacketInfo{}, fmt.Errorf("no ibc messages found for write_acknowledgement query: %s", q) + return provider.PacketInfo{}, fmt.Errorf("no ibc messages found for write_acknowledgement query: %s: %w", q, gerr.ErrNotFound) } // QueryUnreceivedAcknowledgements returns a list of unrelayed packet acks diff --git a/relayer/chains/cosmos/tx.go b/relayer/chains/cosmos/tx.go index 156301ba0..5f223fea3 100644 --- a/relayer/chains/cosmos/tx.go +++ b/relayer/chains/cosmos/tx.go @@ -163,6 +163,14 @@ func (cc *CosmosProvider) SendMessagesToMempool( asyncCtx context.Context, asyncCallbacks []func(*provider.RelayerTxResponse, error), ) error { + { + types := []string{} + for _, msg := range msgs { + types = append(types, msg.Type()) + } + cc.log.Debug("Sending messages to mempool", zap.Any("types", types), zap.Any("chain", cc.PCfg.ChainID)) + } + txSignerKey, feegranterKeyOrAddr, err := cc.buildSignerConfig(msgs) if err != nil { return err @@ -190,6 +198,8 @@ func (cc *CosmosProvider) SendMessagesToMempool( return err } + cc.log.Debug("Transaction successfully sent to mempool", zap.String("chain", cc.PCfg.ChainID)) + // we had a successful tx broadcast with this sequence, so update it to the next cc.updateNextAccountSequence(sequenceGuard, sequence+1) return nil @@ -265,7 +275,6 @@ func (cc *CosmosProvider) SendMsgsWith(ctx context.Context, msgs []sdk.Msg, memo // TODO: This is related to GRPC client stuff? // https://github.com/cosmos/cosmos-sdk/blob/5725659684fc93790a63981c653feee33ecf3225/client/tx/tx.go#L297 _, adjusted, err = cc.CalculateGas(ctx, txf, signingKey, msgs...) - if err != nil { return nil, err } @@ -298,16 +307,15 @@ func (cc *CosmosProvider) SendMsgsWith(ctx context.Context, msgs []sdk.Msg, memo } err = func() error { - //done := cc.SetSDKContext() + // done := cc.SetSDKContext() // ensure that we allways call done, even in case of an error or panic - //defer done() + // defer done() if err = tx.Sign(ctx, txf, signingKey, txb, false); err != nil { return err } return nil }() - if err != nil { return nil, err } @@ -387,7 +395,6 @@ func (cc *CosmosProvider) broadcastTx( address, err := cc.Address() if err != nil { return fmt.Errorf("failed to get relayer bech32 wallet address: %w", err) - } cc.UpdateFeesSpent(cc.ChainId(), cc.Key(), address, fees) @@ -413,7 +420,7 @@ func (cc *CosmosProvider) waitForTx( cc.log.Error("Failed to wait for block inclusion", zap.Error(err)) if len(callbacks) > 0 { for _, cb := range callbacks { - //Call each callback in order since waitForTx is already invoked asyncronously + // Call each callback in order since waitForTx is already invoked asyncronously cb(nil, err) } } @@ -441,7 +448,7 @@ func (cc *CosmosProvider) waitForTx( } if len(callbacks) > 0 { for _, cb := range callbacks { - //Call each callback in order since waitForTx is already invoked asyncronously + // Call each callback in order since waitForTx is already invoked asyncronously cb(nil, err) } } @@ -451,7 +458,7 @@ func (cc *CosmosProvider) waitForTx( if len(callbacks) > 0 { for _, cb := range callbacks { - //Call each callback in order since waitForTx is already invoked asyncronously + // Call each callback in order since waitForTx is already invoked asyncronously cb(rlyResp, nil) } } @@ -645,7 +652,6 @@ func (cc *CosmosProvider) buildMessages( if gas == 0 { _, adjusted, err = cc.CalculateGas(ctx, txf, txSignerKey, cMsgs...) - if err != nil { return nil, 0, sdk.Coins{}, err } @@ -755,9 +761,11 @@ func (cc *CosmosProvider) MsgUpgradeClient(srcClientId string, consRes *clientty return nil, err } - msgUpgradeClient := &clienttypes.MsgUpgradeClient{ClientId: srcClientId, ClientState: clientRes.ClientState, + msgUpgradeClient := &clienttypes.MsgUpgradeClient{ + ClientId: srcClientId, ClientState: clientRes.ClientState, ConsensusState: consRes.ConsensusState, ProofUpgradeClient: consRes.GetProof(), - ProofUpgradeConsensusState: consRes.ConsensusState.Value, Signer: acc} + ProofUpgradeConsensusState: consRes.ConsensusState.Value, Signer: acc, + } return NewCosmosMessage(msgUpgradeClient, func(signer string) { msgUpgradeClient.Signer = signer diff --git a/relayer/processor/message_processor.go b/relayer/processor/message_processor.go index 8fba3b5c7..40dc90b4a 100644 --- a/relayer/processor/message_processor.go +++ b/relayer/processor/message_processor.go @@ -246,13 +246,13 @@ func (mp *messageProcessor) assembleMessage( mp.trackMessage(msg.tracker(assembled), i) wg.Done() if err != nil { - dst.log.Error(fmt.Sprintf("Error assembling %s message", msg.msgType()), + dst.log.Error(fmt.Sprintf("Error assembling message: %s", msg.msgType()), zap.Object("msg", msg), zap.Error(err), ) return } - dst.log.Debug(fmt.Sprintf("Assembled %s message", msg.msgType()), zap.Object("msg", msg)) + dst.log.Debug(fmt.Sprintf("Assembled message: %s", msg.msgType()), zap.Object("msg", msg)) } // assembleMsgUpdateClient uses the ChainProvider from both pathEnds to assemble the client update header @@ -509,7 +509,7 @@ func (mp *messageProcessor) sendBatchMessages( mp.log.Debug("Redundant message(s)", errFields...) return } - mp.log.Error("Error sending messages", errFields...) + mp.log.Error("Sending messages from batch", errFields...) return } dst.log.Debug("Message broadcast completed", fields...) diff --git a/relayer/processor/path_end_runtime.go b/relayer/processor/path_end_runtime.go index c31498dd0..6cad29102 100644 --- a/relayer/processor/path_end_runtime.go +++ b/relayer/processor/path_end_runtime.go @@ -481,7 +481,6 @@ func (pathEnd *pathEndRuntime) mergeCacheData( pathEnd.lastClientUpdateHeightMu.Unlock() pathEnd.inSync = d.InSync - pathEnd.log.Debug("set in sync", zap.Bool("in_sync", pathEnd.inSync), zap.String("chain_id", pathEnd.info.ChainID)) pathEnd.latestHeader = d.LatestHeader pathEnd.clientState = d.ClientState diff --git a/relayer/processor/path_processor.go b/relayer/processor/path_processor.go index 2d082c26e..15de7d0c0 100644 --- a/relayer/processor/path_processor.go +++ b/relayer/processor/path_processor.go @@ -295,6 +295,7 @@ func (pp *PathProcessor) HandleNewData(chainID string, cacheData ChainProcessorC func (pp *PathProcessor) handleFlush(ctx context.Context) { flushTimer := pp.flushInterval + pp.log.Debug("Flushing PathProcessor (handleFlush)") if err := pp.flush(ctx); err != nil { pp.log.Warn("Flush not complete", zap.Error(err)) flushTimer = flushFailureRetry @@ -415,6 +416,14 @@ func (pp *PathProcessor) Run(ctx context.Context, cancel func()) { pp.handleFlush(ctx) pp.initialFlushComplete = true } else if pp.shouldTerminateForFlushComplete() { + pp.log.Debug("PathProcessor terminating due to flush completion. Blocking until finished. CTRL-C!") + + /* + NOTE: it is possible that there are still outstanding broadcasts + This cancel will cancel them + In the future, we may want to wait for them to finish (<-pp.pathEnd1.finishedProcessing etc) + */ + cancel() return } diff --git a/relayer/processor/path_processor_internal.go b/relayer/processor/path_processor_internal.go index f6ada4b75..02b5d6503 100644 --- a/relayer/processor/path_processor_internal.go +++ b/relayer/processor/path_processor_internal.go @@ -8,6 +8,8 @@ import ( "sort" "sync" + "github.com/cosmos/relayer/v2/dymutils/gerr" + conntypes "github.com/cosmos/ibc-go/v8/modules/core/03-connection/types" chantypes "github.com/cosmos/ibc-go/v8/modules/core/04-channel/types" "github.com/cosmos/relayer/v2/relayer/provider" @@ -1064,6 +1066,14 @@ func (pp *PathProcessor) processLatestMessages(ctx context.Context, cancel func( clientICQMessages: pathEnd2ClientICQMessages, } + if pathEnd1Messages.size() != 0 || pathEnd2Messages.size() != 0 { + pp.log.Debug("Processing some messages", + zap.Any("path1ChainID", pp.pathEnd1.info.ChainID), + zap.Any("pathEnd1Messages", pathEnd1Messages.debugString()), + zap.Any("path2ChainID", pp.pathEnd2.info.ChainID), + zap.Any("pathEnd2Messages", pathEnd2Messages.debugString())) + } + // now assemble and send messages in parallel // if sending messages fails to one pathEnd, we don't need to halt sending to the other pathEnd. var eg errgroup.Group @@ -1226,7 +1236,7 @@ func (pp *PathProcessor) queuePendingRecvAndAcks( unrecv, err := dst.chainProvider.QueryUnreceivedPackets(ctx, dst.latestBlock.Height, dstChan, dstPort, seqs) if err != nil { - return nil, err + return nil, fmt.Errorf("query unreceived packets: %w", err) } if pp.metrics != nil { @@ -1240,7 +1250,7 @@ func (pp *PathProcessor) queuePendingRecvAndAcks( if len(unrecv) > 0 { channel, err := dst.chainProvider.QueryChannel(ctx, dstHeight, dstChan, dstPort) if err != nil { - return nil, err + return nil, fmt.Errorf("query channel: %w", err) } order = channel.Channel.Ordering @@ -1248,7 +1258,7 @@ func (pp *PathProcessor) queuePendingRecvAndAcks( if channel.Channel.Ordering == chantypes.ORDERED { nextSeqRecv, err := dst.chainProvider.QueryNextSeqRecv(ctx, dstHeight, dstChan, dstPort) if err != nil { - return nil, err + return nil, fmt.Errorf("query next sequence receive: %w", err) } var newUnrecv []uint64 @@ -1301,7 +1311,7 @@ func (pp *PathProcessor) queuePendingRecvAndAcks( eg.Go(func() error { sendPacket, err := src.chainProvider.QuerySendPacket(ctx, k.ChannelID, k.PortID, seq) if err != nil { - return err + return fmt.Errorf("query send packet: %w", err) } sendPacket.ChannelOrder = order.String() srcMu.Lock() @@ -1321,7 +1331,7 @@ func (pp *PathProcessor) queuePendingRecvAndAcks( } if err := eg.Wait(); err != nil { - return skipped, err + return skipped, fmt.Errorf("eg wait 1: %w", err) } if len(unrecv) > 0 { @@ -1346,7 +1356,15 @@ SeqLoop: continue SeqLoop } } - // does not exist in unrecv, so this is an ack that must be written + /* + The packet is not in unrecv, meaning it has been received by the destination chain + The commitment on the src chain still exists + That means we need to send an ack to the src chain + HOWEVER + In the upstream relayer, they assume the ack is already available + For us, due to delayedack, it may not be available for some time + Thus, we adjust the code to make sure we gracefully handle this, and don't short circuit + */ unacked = append(unacked, seq) } @@ -1354,6 +1372,9 @@ SeqLoop: pp.metrics.SetUnrelayedAcks(pp.pathEnd1.info.PathName, src.info.ChainID, dst.info.ChainID, k.ChannelID, k.CounterpartyChannelID, len(unacked)) } + var unackedAndWillAck []uint64 + var unackedAndWillAckMu sync.Mutex + for i, seq := range unacked { ck := k.Counterparty() @@ -1387,7 +1408,13 @@ SeqLoop: eg.Go(func() error { recvPacket, err := dst.chainProvider.QueryRecvPacket(ctx, k.CounterpartyChannelID, k.CounterpartyPortID, seq) if err != nil { - return err + if !errors.Is(err, gerr.ErrNotFound) { + return fmt.Errorf("query recv packet: seq: dst: %s: %d: %w", dst.info.ChainID, seq, err) + } + /* + It's possible that an acknowledgement event was not yet published on the dst chain + */ + return nil } ck := k.Counterparty() @@ -1397,19 +1424,23 @@ SeqLoop: dstCache.Cache(chantypes.EventTypeWriteAck, ck, seq, recvPacket) dstMu.Unlock() + unackedAndWillAckMu.Lock() + unackedAndWillAck = append(unackedAndWillAck, seq) + unackedAndWillAckMu.Unlock() + return nil }) } if err := eg.Wait(); err != nil { - return skipped, err + return skipped, fmt.Errorf("eg wait 2: %w", err) } - if len(unacked) > 0 { + if len(unackedAndWillAck) > 0 { dst.log.Debug( "Will flush MsgAcknowledgement", zap.Object("channel", k), - zap.Uint64s("sequences", unacked), + zap.Uint64s("sequences", unackedAndWillAck), ) } else { dst.log.Debug( @@ -1465,7 +1496,7 @@ func (pp *PathProcessor) flush(ctx context.Context) error { } if err := eg.Wait(); err != nil { - return fmt.Errorf("failed to query packet commitments: %w", err) + return fmt.Errorf("query packet commitments: %w", err) } // From remaining packet commitments, determine if: @@ -1479,7 +1510,7 @@ func (pp *PathProcessor) flush(ctx context.Context) error { eg.Go(func() error { s, err := pp.queuePendingRecvAndAcks(ctx, pp.pathEnd1, pp.pathEnd2, k, seqs, pathEnd1Cache.PacketFlow, pathEnd2Cache.PacketFlow, &pathEnd1CacheMu, &pathEnd2CacheMu) if err != nil { - return err + return fmt.Errorf("queue pending recv and acks: commitments 1: %w", err) } if s != nil { if _, ok := skipped[pp.pathEnd1.info.ChainID]; !ok { @@ -1494,23 +1525,11 @@ func (pp *PathProcessor) flush(ctx context.Context) error { for k, seqs := range commitments2 { k := k seqs := seqs - eg.Go(func() error { - s, err := pp.queuePendingRecvAndAcks( - ctx, - pp.pathEnd2, - pp.pathEnd1, - k, - seqs, - pathEnd2Cache.PacketFlow, - pathEnd1Cache.PacketFlow, - &pathEnd2CacheMu, - &pathEnd1CacheMu, - ) + s, err := pp.queuePendingRecvAndAcks(ctx, pp.pathEnd2, pp.pathEnd1, k, seqs, pathEnd2Cache.PacketFlow, pathEnd1Cache.PacketFlow, &pathEnd2CacheMu, &pathEnd1CacheMu) if err != nil { - return err + return fmt.Errorf("queue pending recv and acks: commitments 2: %w", err) } - if s != nil { if _, ok := skipped[pp.pathEnd2.info.ChainID]; !ok { skipped[pp.pathEnd2.info.ChainID] = make(map[ChannelKey]skippedPackets) @@ -1524,7 +1543,7 @@ func (pp *PathProcessor) flush(ctx context.Context) error { } if err := eg.Wait(); err != nil { - return fmt.Errorf("failed to enqueue pending messages for flush: %w", err) + return fmt.Errorf("enqueue pending messages for flush: %w", err) } pp.pathEnd1.mergeMessageCache(pathEnd1Cache, pp.pathEnd2.info.ChainID, pp.pathEnd2.inSync, pp.memoLimit, pp.maxReceiverSize) diff --git a/relayer/processor/types_internal.go b/relayer/processor/types_internal.go index c3079cd12..7e4cb902d 100644 --- a/relayer/processor/types_internal.go +++ b/relayer/processor/types_internal.go @@ -14,10 +14,12 @@ import ( "go.uber.org/zap/zapcore" ) -var _ zapcore.ObjectMarshaler = packetIBCMessage{} -var _ zapcore.ObjectMarshaler = channelIBCMessage{} -var _ zapcore.ObjectMarshaler = connectionIBCMessage{} -var _ zapcore.ObjectMarshaler = clientICQMessage{} +var ( + _ zapcore.ObjectMarshaler = packetIBCMessage{} + _ zapcore.ObjectMarshaler = channelIBCMessage{} + _ zapcore.ObjectMarshaler = connectionIBCMessage{} + _ zapcore.ObjectMarshaler = clientICQMessage{} +) // pathEndMessages holds the different IBC messages that // will attempt to be sent to the pathEnd. @@ -28,6 +30,37 @@ type pathEndMessages struct { clientICQMessages []clientICQMessage } +func (m *pathEndMessages) size() int { + return len(m.connectionMessages) + len(m.channelMessages) + len(m.packetMessages) + len(m.clientICQMessages) +} + +func (m *pathEndMessages) debugString() string { + type T struct { + ConnectionMessages int + ChannelMessages int + PacketMessages int + ClientICQMessages int + Total int + PacketSequences []int + PacketSequencesIsJustAPreview bool + } + t := T{ + ConnectionMessages: len(m.connectionMessages), + ChannelMessages: len(m.channelMessages), + PacketMessages: len(m.packetMessages), + ClientICQMessages: len(m.clientICQMessages), + Total: m.size(), + } + for _, msg := range m.packetMessages { + t.PacketSequences = append(t.PacketSequences, int(msg.info.Sequence)) + } + if 5 < len(t.PacketSequences) { // dont want to show too many + t.PacketSequences = t.PacketSequences[:5] + t.PacketSequencesIsJustAPreview = true + } + return fmt.Sprintf("%+v", t) +} + type ibcMessage interface { // assemble executes the appropriate proof query function, // then, if successful, assembles the message for the destination. @@ -363,8 +396,10 @@ func (m *processingMessage) setFinishedProcessing(height uint64) { m.processing = false } -type packetProcessingCache map[ChannelKey]packetChannelMessageCache -type packetChannelMessageCache map[string]*packetMessageSendCache +type ( + packetProcessingCache map[ChannelKey]packetChannelMessageCache + packetChannelMessageCache map[string]*packetMessageSendCache +) type packetMessageSendCache struct { mu sync.Mutex @@ -408,11 +443,13 @@ func (c packetChannelMessageCache) deleteMessages(toDelete ...map[string][]uint6 } } -type channelProcessingCache map[string]*channelKeySendCache -type channelKeySendCache struct { - mu sync.Mutex - m map[ChannelKey]*processingMessage -} +type ( + channelProcessingCache map[string]*channelKeySendCache + channelKeySendCache struct { + mu sync.Mutex + m map[ChannelKey]*processingMessage + } +) func newChannelKeySendCache() *channelKeySendCache { return &channelKeySendCache{ @@ -451,11 +488,13 @@ func (c channelProcessingCache) deleteMessages(toDelete ...map[string][]ChannelK } } -type connectionProcessingCache map[string]*connectionKeySendCache -type connectionKeySendCache struct { - mu sync.Mutex - m map[ConnectionKey]*processingMessage -} +type ( + connectionProcessingCache map[string]*connectionKeySendCache + connectionKeySendCache struct { + mu sync.Mutex + m map[ConnectionKey]*processingMessage + } +) func newConnectionKeySendCache() *connectionKeySendCache { return &connectionKeySendCache{