Skip to content

Commit

Permalink
feat(streamer): distribute rewards immediately in the current block
Browse files Browse the repository at this point in the history
  • Loading branch information
keruch committed Sep 2, 2024
1 parent 25caeb2 commit a335aee
Show file tree
Hide file tree
Showing 15 changed files with 654 additions and 330 deletions.
29 changes: 13 additions & 16 deletions internal/pagination/paginate.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,24 +6,21 @@ type Iterator[T any] interface {
Valid() bool
}

type Stop bool

const (
Break Stop = true
Continue Stop = false
)

// Paginate is a function that paginates over an iterator. The callback is executed for each iteration and if it
// returns true, the pagination stops. The function returns the amount of iterations before stopping.
func Paginate[T any](iter Iterator[T], perPage uint64, cb func(T) Stop) uint64 {
// returns true, the pagination stops. The callback also returns the weight of the iteration. That is, one iteration
// may be counted as multiple iterations. For example, in case if the called decides that the iteration is heavy
// or time-consuming. The function returns the amount of iterations before stopping.
func Paginate[T any](
iter Iterator[T],
perPage uint64,
cb func(T) (stop bool, weight uint64),
) uint64 {
iterations := uint64(0)
for ; iterations < perPage && iter.Valid(); iter.Next() {
iterations++

stop := cb(iter.Value())
if stop {
break
}
stop := false
for ; !stop && iterations < perPage && iter.Valid(); iter.Next() {
var weight uint64
stop, weight = cb(iter.Value())
iterations += weight
}
return iterations
}
150 changes: 118 additions & 32 deletions internal/pagination/paginate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,53 +31,139 @@ func (t *testIterator) Valid() bool {

func TestPaginate(t *testing.T) {
testCases := []struct {
name string
iterator pagination.Iterator[int]
perPage uint64
stopValue int
expected uint64
name string
iterator pagination.Iterator[int]
maxIterations uint64
stopValue int
expectedIterations uint64
iterationWeight uint64
}{
{
name: "Empty iterator",
iterator: newTestIterator([]int{}),
perPage: 5,
stopValue: -1,
expected: 0,
name: "Empty iterator",
iterator: newTestIterator([]int{}),
maxIterations: 5,
stopValue: -1,
expectedIterations: 0,
iterationWeight: 1,
},
{
name: "Non-Empty iterator less than perPage",
iterator: newTestIterator([]int{1, 2, 3}),
perPage: 10,
stopValue: -1,
expected: 3,
name: "Non-Empty iterator less than maxIterations",
iterator: newTestIterator([]int{1, 2, 3}),
maxIterations: 10,
stopValue: -1,
expectedIterations: 3,
iterationWeight: 1,
},
{
name: "Non-empty iterator greater than perPage",
iterator: newTestIterator([]int{1, 2, 3, 4, 5, 6, 7}),
perPage: 5,
stopValue: -1,
expected: 5,
name: "Non-empty iterator greater than maxIterations",
iterator: newTestIterator([]int{1, 2, 3, 4, 5, 6, 7}),
maxIterations: 5,
stopValue: -1,
expectedIterations: 5,
iterationWeight: 1,
},
{
name: "Zero perPage",
iterator: newTestIterator([]int{1, 2, 3, 4, 5, 6, 7}),
perPage: 0,
stopValue: 6,
expected: 0,
name: "Zero maxIterations",
iterator: newTestIterator([]int{1, 2, 3, 4, 5, 6, 7}),
maxIterations: 0,
stopValue: 6,
expectedIterations: 0,
iterationWeight: 1,
},
{
name: "Non-Empty iterator with stop condition",
iterator: newTestIterator([]int{1, 2, 3, 4, 5, 6, 7}),
perPage: 10,
stopValue: 3,
expected: 3,
name: "Non-Empty iterator with stop condition",
iterator: newTestIterator([]int{1, 2, 3, 4, 5, 6, 7}),
maxIterations: 10,
stopValue: 3,
expectedIterations: 3,
iterationWeight: 1,
},
{
name: "Empty iterator, >1 iteration weight",
iterator: newTestIterator([]int{}),
maxIterations: 5,
stopValue: -1,
expectedIterations: 0,
iterationWeight: 3,
},
{
name: "Non-Empty iterator less than maxIterations, >1 iteration weight",
iterator: newTestIterator([]int{1, 2, 3}),
maxIterations: 10,
stopValue: -1,
expectedIterations: 9,
iterationWeight: 3,
},
{
name: "Non-empty iterator greater than maxIterations, >1 iteration weight",
iterator: newTestIterator([]int{1, 2, 3, 4, 5, 6, 7}),
maxIterations: 5,
stopValue: -1,
expectedIterations: 6,
iterationWeight: 3,
},
{
name: "Zero maxIterations, >1 iteration weight",
iterator: newTestIterator([]int{1, 2, 3, 4, 5, 6, 7}),
maxIterations: 0,
stopValue: 6,
expectedIterations: 0,
iterationWeight: 3,
},
{
name: "Non-Empty iterator with stop condition, >1 iteration weight",
iterator: newTestIterator([]int{1, 2, 3, 4, 5, 6, 7}),
maxIterations: 10,
stopValue: 3,
expectedIterations: 9,
iterationWeight: 3,
},
{
name: "Empty iterator, 0 iteration weight",
iterator: newTestIterator([]int{}),
maxIterations: 5,
stopValue: -1,
expectedIterations: 0,
iterationWeight: 0,
},
{
name: "Non-Empty iterator less than maxIterations, 0 iteration weight",
iterator: newTestIterator([]int{1, 2, 3}),
maxIterations: 10,
stopValue: -1,
expectedIterations: 9,
iterationWeight: 3,
},
{
name: "Non-empty iterator greater than maxIterations, 0 iteration weight",
iterator: newTestIterator([]int{1, 2, 3, 4, 5, 6, 7}),
maxIterations: 5,
stopValue: -1,
expectedIterations: 6,
iterationWeight: 3,
},
{
name: "Zero maxIterations, 0 iteration weight",
iterator: newTestIterator([]int{1, 2, 3, 4, 5, 6, 7}),
maxIterations: 0,
stopValue: 6,
expectedIterations: 0,
iterationWeight: 3,
},
{
name: "Non-Empty iterator with stop condition, 0 iteration weight",
iterator: newTestIterator([]int{1, 2, 3, 4, 5, 6, 7}),
maxIterations: 10,
stopValue: 3,
expectedIterations: 9,
iterationWeight: 3,
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
result := pagination.Paginate(tc.iterator, tc.perPage, func(i int) pagination.Stop { return i == tc.stopValue })
require.Equal(t, tc.expected, result)
result := pagination.Paginate(tc.iterator, tc.maxIterations, func(i int) (bool, uint64) { return i == tc.stopValue, tc.iterationWeight })
require.Equal(t, tc.expectedIterations, result)
})
}
}
2 changes: 1 addition & 1 deletion x/incentives/types/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ var (
// LegacyAmino codec. These types are used for Amino JSON serialization.
func RegisterCodec(cdc *codec.LegacyAmino) {
cdc.RegisterConcrete(&MsgCreateGauge{}, "dymensionxyz/dymension/incentives/CreateGauge", nil)
cdc.RegisterConcrete(&MsgAddToGauge{}, "dymensionxyz/dymension/incentives/AddToGauge", nil)
cdc.RegisterConcrete(&MsgAddToGauge{}, "dymensionxyz/dymension/incentives/CalculateGaugeRewards", nil)
}

// RegisterInterfaces registers interfaces and implementations of the incentives module.
Expand Down
8 changes: 4 additions & 4 deletions x/incentives/types/tx.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

28 changes: 22 additions & 6 deletions x/streamer/keeper/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/dymensionxyz/sdk-utils/utils/uevent"

incentivestypes "github.com/dymensionxyz/dymension/v3/x/incentives/types"
"github.com/dymensionxyz/dymension/v3/x/streamer/types"
)

Expand All @@ -22,7 +23,9 @@ func (k Keeper) EndBlock(ctx sdk.Context) error {

maxIterations := k.GetParams(ctx).MaxIterationsPerBlock
totalIterations := uint64(0)
totalDistributed := sdk.NewCoins()

streamCache := newStreamInfo(streams)
gaugeCache := newGaugeInfo()

for _, p := range epochPointers {
remainIterations := maxIterations - totalIterations
Expand All @@ -31,20 +34,33 @@ func (k Keeper) EndBlock(ctx sdk.Context) error {
break // no more iterations available for this block
}

result := k.DistributeRewards(ctx, p, remainIterations, streams)
result := k.CalculateRewards(ctx, p, remainIterations, streamCache, gaugeCache)

totalIterations += result.Iterations
totalDistributed = totalDistributed.Add(result.DistributedCoins...)
streams = result.FilledStreams

err = k.SaveEpochPointer(ctx, result.NewPointer)
if err != nil {
return fmt.Errorf("save epoch pointer: %w", err)
}
}

// Filter gauges to distribute
toDistribute := k.filterGauges(ctx, gaugeCache)

// Send coins to distribute to the x/incentives module
err = k.bk.SendCoinsFromModuleToModule(ctx, types.ModuleName, incentivestypes.ModuleName, streamCache.totalDistr)
if err != nil {
return fmt.Errorf("send coins: %w", err)
}

// Distribute the rewards
_, err = k.ik.Distribute(ctx, toDistribute)

Check warning

Code scanning / CodeQL

Panic in BeginBock or EndBlock consensus methods Warning

path flow from Begin/EndBlock to a panic call
path flow from Begin/EndBlock to a panic call
path flow from Begin/EndBlock to a panic call
if err != nil {
return fmt.Errorf("distribute: %w", err)
}

// Save stream updates
for _, stream := range streams {
for _, stream := range streamCache.getStreams() {
err = k.SetStream(ctx, &stream)
if err != nil {
return fmt.Errorf("set stream: %w", err)
Expand All @@ -54,7 +70,7 @@ func (k Keeper) EndBlock(ctx sdk.Context) error {
err = uevent.EmitTypedEvent(ctx, &types.EventEndBlock{
Iterations: totalIterations,
MaxIterations: maxIterations,
Distributed: totalDistributed,
Distributed: streamCache.totalDistr,
})
if err != nil {
return fmt.Errorf("emit typed event: %w", err)
Expand Down
Loading

0 comments on commit a335aee

Please sign in to comment.