Skip to content

Commit

Permalink
fix(stuck packet): sync properly after finding stuck packets (#19)
Browse files Browse the repository at this point in the history
  • Loading branch information
danwt authored May 4, 2024
1 parent 642d476 commit 03f748b
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 6 deletions.
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
<div align="center">
<h1>Relayer</h1>

<h2>A note on Dymension patches:</h2>

- stuck packet search now syncs correctly
- will no longer unnecessarily wait for a block to elapse before syncing (good for slow chains)
- recommended to use fast query loop for fast rollapp chains (`min-loop-duration: 100ms`)

![banner](./docs/images/comp.gif)

[![Project Status: Initial Release](https://img.shields.io/badge/repo%20status-active-green.svg?style=flat-square)](https://www.repostatus.org/#active)
Expand Down
29 changes: 24 additions & 5 deletions relayer/chains/cosmos/cosmos_chain_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,7 @@ func (ccp *CosmosChainProcessor) Run(ctx context.Context, initialBlockHistory ui
latestQueriedBlock = 0
}

afterUnstuck := latestQueriedBlock
if stuckPacket != nil && ccp.chainProvider.ChainId() == stuckPacket.ChainID {
latestQueriedBlock = int64(stuckPacket.StartHeight)
}
Expand All @@ -274,7 +275,7 @@ func (ccp *CosmosChainProcessor) Run(ctx context.Context, initialBlockHistory ui
defer ticker.Stop()

for {
if err := ccp.queryCycle(ctx, &persistence, stuckPacket); err != nil {
if err := ccp.queryCycle(ctx, &persistence, stuckPacket, afterUnstuck); err != nil {
return err
}
select {
Expand Down Expand Up @@ -340,7 +341,12 @@ func (ccp *CosmosChainProcessor) initializeChannelState(ctx context.Context) err
return nil
}

func (ccp *CosmosChainProcessor) queryCycle(ctx context.Context, persistence *queryCyclePersistence, stuckPacket *processor.StuckPacket) error {
func (ccp *CosmosChainProcessor) queryCycle(
ctx context.Context,
persistence *queryCyclePersistence,
stuckPacket *processor.StuckPacket,
afterUnstuck int64,
) error {
status, err := ccp.nodeStatusWithRetry(ctx)
if err != nil {
// don't want to cause CosmosChainProcessor to quit here, can retry again next cycle.
Expand Down Expand Up @@ -370,7 +376,7 @@ func (ccp *CosmosChainProcessor) queryCycle(ctx context.Context, persistence *qu
if (persistence.latestHeight - persistence.latestQueriedBlock) < int64(defaultInSyncNumBlocksThreshold) {
ccp.inSync = true
firstTimeInSync = true
ccp.log.Info("Chain is in sync")
ccp.log.Info("Chain is in sync", zap.Bool("first time", firstTimeInSync))
} else {
ccp.log.Info("Chain is not yet in sync",
zap.Int64("latest_queried_block", persistence.latestQueriedBlock),
Expand Down Expand Up @@ -398,6 +404,8 @@ func (ccp *CosmosChainProcessor) queryCycle(ctx context.Context, persistence *qu
firstHeightToQuery++
}

startTime := time.Now()

for i := firstHeightToQuery; i <= persistence.latestHeight; i++ {
var (
eg errgroup.Group
Expand Down Expand Up @@ -494,8 +502,17 @@ func (ccp *CosmosChainProcessor) queryCycle(ctx context.Context, persistence *qu
if stuckPacket != nil &&
ccp.chainProvider.ChainId() == stuckPacket.ChainID &&
newLatestQueriedBlock == int64(stuckPacket.EndHeight) {

i = persistence.latestHeight
ccp.log.Debug("Parsed stuck packet height, skipping to current")

newLatestQueriedBlock = afterUnstuck
// newLatestQueriedBlock = persistence.latestHeight // this line fixes it, but why?
ccp.log.Info("Parsed stuck packet height, skipping to current", zap.Any("new latest queried block", persistence.latestHeight))
}

if i%100 == 0 {
elapsed := time.Since(startTime)
ccp.log.Info("Processed block", zap.Int64("height", i), zap.Duration("elapsed", elapsed), zap.Int64("latest", persistence.latestHeight))
}
}

Expand Down Expand Up @@ -524,7 +541,9 @@ func (ccp *CosmosChainProcessor) queryCycle(ctx context.Context, persistence *qu
continue
}

ccp.log.Debug("sending new data to the path processor", zap.Bool("inSync", ccp.inSync))
if stuckPacket != nil && ccp.chainProvider.ChainId() == stuckPacket.ChainID {
ccp.log.Info("sending new data to the path processor", zap.Bool("inSync", ccp.inSync))
}

pp.HandleNewData(chainID, processor.ChainProcessorCacheData{
LatestBlock: ccp.latestBlock,
Expand Down
2 changes: 1 addition & 1 deletion relayer/chains/cosmos/message_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ func (ccp *CosmosChainProcessor) handlePacketMessage(eventType string, pi provid
ccp.log.Debug("Retaining packet message",
zap.String("event_type", eventType),
zap.Uint64("sequence", pi.Sequence),
zap.Uint64("height", pi.Height),
zap.Inline(k),
)

Expand Down Expand Up @@ -186,7 +187,6 @@ func (ccp *CosmosChainProcessor) logChannelMessage(message string, ci provider.C

func (ccp *CosmosChainProcessor) logChannelOpenMessage(message string, ci provider.ChannelInfo) {
fields := []zap.Field{

zap.String("channel_id", ci.ChannelID),
zap.String("connection_id", ci.ConnID),
zap.String("port_id", ci.PortID),
Expand Down
1 change: 1 addition & 0 deletions relayer/processor/path_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,7 @@ func (pp *PathProcessor) Run(ctx context.Context, cancel func()) {
retryTimer = time.AfterFunc(durationErrorRetry, pp.ProcessBacklogIfReady)
}
}

}
}

Expand Down

0 comments on commit 03f748b

Please sign in to comment.