Skip to content

Commit

Permalink
fix(delayed ack support): adds no flush flag (#28)
Browse files Browse the repository at this point in the history
  • Loading branch information
danwt authored and omritoptix committed May 16, 2024
1 parent 12fd436 commit 91c92e9
Show file tree
Hide file tree
Showing 10 changed files with 46 additions and 86 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
- stuck packet search now syncs correctly
- will no longer unnecessarily wait for a block to elapse before syncing (good for slow chains)
- recommended to use fast query loop for fast rollapp chains (`min-loop-duration: 100ms`)
- use `start --no-flush` to avoid flushing completely

![banner](./docs/images/comp.gif)

Expand Down
14 changes: 14 additions & 0 deletions cmd/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -469,6 +469,20 @@ func flushIntervalFlag(v *viper.Viper, cmd *cobra.Command) *cobra.Command {
return cmd
}

func noFlushFlag(v *viper.Viper, cmd *cobra.Command) *cobra.Command {
cmd.Flags().Bool(
"no-flush",
false,
"avoid flushing entirely (normal behavior is to flush once at start and then on interval)",
)

if err := v.BindPFlag("no-flush", cmd.Flags().Lookup("no-flush")); err != nil {
panic(err)
}

return cmd
}

func memoFlag(v *viper.Viper, cmd *cobra.Command) *cobra.Command {
cmd.Flags().String(flagMemo, "", "a memo to include in relayed packets")
if err := v.BindPFlag(flagMemo, cmd.Flags().Lookup(flagMemo)); err != nil {
Expand Down
26 changes: 9 additions & 17 deletions cmd/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,28 +149,19 @@ $ %s start demo-path2 --max-tx-size 10`, appName, appName, appName, appName)),
return err
}

noFlush, err := cmd.Flags().GetBool("no-flush")
// TODO: us eit
_ = noFlush
if err != nil {
return err
}

stuckPacket, err := parseStuckPacketFromFlags(cmd)
if err != nil {
return err
}

rlyErrCh := relayer.StartRelayer(
cmd.Context(),
a.log,
chains,
paths,
maxMsgLength,
a.config.Global.MaxReceiverSize,
a.config.Global.ICS20MemoLimit,
a.config.memo(cmd),
clientUpdateThresholdTime,
flushInterval,
nil,
processorType,
initialBlockHistory,
prometheusMetrics,
stuckPacket,
)
rlyErrCh := relayer.StartRelayer(cmd.Context(), a.log, chains, paths, maxMsgLength, a.config.Global.MaxReceiverSize, a.config.Global.ICS20MemoLimit, a.config.memo(cmd), clientUpdateThresholdTime, flushInterval, nil, processorType, initialBlockHistory, prometheusMetrics, stuckPacket, noFlush)

// Block until the error channel sends a message.
// The context being canceled will cause the relayer to stop,
Expand All @@ -192,6 +183,7 @@ $ %s start demo-path2 --max-tx-size 10`, appName, appName, appName, appName)),
cmd = processorFlag(a.viper, cmd)
cmd = initBlockFlag(a.viper, cmd)
cmd = flushIntervalFlag(a.viper, cmd)
cmd = noFlushFlag(a.viper, cmd)
cmd = memoFlag(a.viper, cmd)
cmd = stuckPacketFlags(a.viper, cmd)
return cmd
Expand Down
18 changes: 1 addition & 17 deletions cmd/tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -952,23 +952,7 @@ $ %s tx flush demo-path channel-0`,
ctx, cancel := context.WithTimeout(cmd.Context(), flushTimeout)
defer cancel()

rlyErrCh := relayer.StartRelayer(
ctx,
a.log,
chains,
paths,
maxMsgLength,
a.config.Global.MaxReceiverSize,
a.config.Global.ICS20MemoLimit,
a.config.memo(cmd),
0,
0,
&processor.FlushLifecycle{},
relayer.ProcessorEvents,
0,
nil,
stuckPacket,
)
rlyErrCh := relayer.StartRelayer(ctx, a.log, chains, paths, maxMsgLength, a.config.Global.MaxReceiverSize, a.config.Global.ICS20MemoLimit, a.config.memo(cmd), 0, 0, &processor.FlushLifecycle{}, relayer.ProcessorEvents, 0, nil, stuckPacket, false)

// Block until the error channel sends a message.
// The context being canceled will cause the relayer to stop,
Expand Down
2 changes: 1 addition & 1 deletion relayer/chains/mock/mock_chain_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func TestMockChainAndPathProcessors(t *testing.T) {
flushInterval := 6 * time.Hour

pathProcessor := processor.NewPathProcessor(log, pathEnd1, pathEnd2, metrics, "",
clientUpdateThresholdTime, flushInterval, relayer.DefaultMaxMsgLength, 0, 1)
clientUpdateThresholdTime, flushInterval, false, relayer.DefaultMaxMsgLength, 0, 1)

eventProcessor := processor.NewEventProcessor().
WithChainProcessors(
Expand Down
3 changes: 3 additions & 0 deletions relayer/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ func (c *Chain) CreateOpenChannels(
memo,
DefaultClientUpdateThreshold,
DefaultFlushInterval,
false,
DefaultMaxMsgLength,
0,
0,
Expand Down Expand Up @@ -134,6 +135,7 @@ func (c *Chain) CloseChannel(
memo,
DefaultClientUpdateThreshold,
DefaultFlushInterval,
false,
DefaultMaxMsgLength,
0,
0,
Expand Down Expand Up @@ -174,6 +176,7 @@ func (c *Chain) CloseChannel(
memo,
DefaultClientUpdateThreshold,
DefaultFlushInterval,
false,
DefaultMaxMsgLength,
0,
0,
Expand Down
1 change: 1 addition & 0 deletions relayer/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ func (c *Chain) CreateOpenConnections(
memo,
DefaultClientUpdateThreshold,
DefaultFlushInterval,
false,
DefaultMaxMsgLength,
0,
0,
Expand Down
5 changes: 5 additions & 0 deletions relayer/processor/message_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -494,12 +494,17 @@ func (mp *messageProcessor) sendBatchMessages(
for _, t := range batch {
dst.finishedProcessing <- t
}
var mTypes []string
for _, m := range msgs {
mTypes = append(mTypes, m.Type())
}
errFields := []zapcore.Field{
zap.String("path_name", src.info.PathName),
zap.String("src_chain_id", src.info.ChainID),
zap.String("dst_chain_id", dst.info.ChainID),
zap.String("src_client_id", src.info.ClientID),
zap.String("dst_client_id", dst.info.ClientID),
zap.Any("types", mTypes),
zap.Error(err),
}

Expand Down
7 changes: 7 additions & 0 deletions relayer/processor/path_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ type PathProcessor struct {
initialFlushComplete bool
flushTimer *time.Timer
flushInterval time.Duration
noFlush bool

// Signals to retry.
retryProcess chan struct{}
Expand Down Expand Up @@ -100,6 +101,7 @@ func NewPathProcessor(
memo string,
clientUpdateThresholdTime time.Duration,
flushInterval time.Duration,
noFlush bool,
maxMsgs uint64,
memoLimit, maxReceiverSize int,
) *PathProcessor {
Expand All @@ -118,6 +120,7 @@ func NewPathProcessor(
maxMsgs: maxMsgs,
memoLimit: memoLimit,
maxReceiverSize: maxReceiverSize,
noFlush: noFlush,
}
if flushInterval == 0 {
pp.disablePeriodicFlush()
Expand Down Expand Up @@ -294,6 +297,10 @@ func (pp *PathProcessor) HandleNewData(chainID string, cacheData ChainProcessorC
}

func (pp *PathProcessor) handleFlush(ctx context.Context) {
if pp.noFlush {
pp.log.Debug("Not flushing - disabled.")
return
}
flushTimer := pp.flushInterval
if err := pp.flush(ctx); err != nil {
pp.log.Warn("Flush not complete", zap.Error(err))
Expand Down
55 changes: 4 additions & 51 deletions relayer/strategies.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,23 +33,7 @@ const (
)

// StartRelayer starts the main relaying loop and returns a channel that will contain any control-flow related errors.
func StartRelayer(
ctx context.Context,
log *zap.Logger,
chains map[string]*Chain,
paths []NamedPath,
maxMsgLength uint64,
maxReceiverSize,
memoLimit int,
memo string,
clientUpdateThresholdTime time.Duration,
flushInterval time.Duration,
messageLifecycle processor.MessageLifecycle,
processorType string,
initialBlockHistory uint64,
metrics *processor.PrometheusMetrics,
stuckPacket *processor.StuckPacket,
) chan error {
func StartRelayer(ctx context.Context, log *zap.Logger, chains map[string]*Chain, paths []NamedPath, maxMsgLength uint64, maxReceiverSize, memoLimit int, memo string, clientUpdateThresholdTime, flushInterval time.Duration, messageLifecycle processor.MessageLifecycle, processorType string, initialBlockHistory uint64, metrics *processor.PrometheusMetrics, stuckPacket *processor.StuckPacket, noFlush bool) chan error {
// prevent incorrect bech32 address prefixed addresses when calling AccAddress.String()
sdk.SetAddrCacheEnabled(false)
errorChan := make(chan error, 1)
Expand Down Expand Up @@ -94,23 +78,7 @@ func StartRelayer(
}
}

go relayerStartEventProcessor(
ctx,
log,
chainProcessors,
ePaths,
initialBlockHistory,
maxMsgLength,
maxReceiverSize,
memoLimit,
memo,
messageLifecycle,
clientUpdateThresholdTime,
flushInterval,
errorChan,
metrics,
stuckPacket,
)
go relayerStartEventProcessor(ctx, log, chainProcessors, ePaths, initialBlockHistory, maxMsgLength, maxReceiverSize, memoLimit, memo, messageLifecycle, clientUpdateThresholdTime, flushInterval, errorChan, metrics, stuckPacket, noFlush)
return errorChan
case ProcessorLegacy:
if len(paths) != 1 {
Expand Down Expand Up @@ -151,23 +119,7 @@ func (c *Chain) chainProcessor(
}

// relayerStartEventProcessor is the main relayer process when using the event processor.
func relayerStartEventProcessor(
ctx context.Context,
log *zap.Logger,
chainProcessors []processor.ChainProcessor,
paths []path,
initialBlockHistory uint64,
maxMsgLength uint64,
maxReceiverSize,
memoLimit int,
memo string,
messageLifecycle processor.MessageLifecycle,
clientUpdateThresholdTime time.Duration,
flushInterval time.Duration,
errCh chan<- error,
metrics *processor.PrometheusMetrics,
stuckPacket *processor.StuckPacket,
) {
func relayerStartEventProcessor(ctx context.Context, log *zap.Logger, chainProcessors []processor.ChainProcessor, paths []path, initialBlockHistory, maxMsgLength uint64, maxReceiverSize, memoLimit int, memo string, messageLifecycle processor.MessageLifecycle, clientUpdateThresholdTime, flushInterval time.Duration, errCh chan<- error, metrics *processor.PrometheusMetrics, stuckPacket *processor.StuckPacket, noFlush bool) {
defer close(errCh)

epb := processor.NewEventProcessor().
Expand All @@ -184,6 +136,7 @@ func relayerStartEventProcessor(
memo,
clientUpdateThresholdTime,
flushInterval,
noFlush,
maxMsgLength,
memoLimit,
maxReceiverSize,
Expand Down

0 comments on commit 91c92e9

Please sign in to comment.