diff --git a/internal/pagination/paginate.go b/internal/pagination/paginate.go index 5a2df451b..17a49281a 100644 --- a/internal/pagination/paginate.go +++ b/internal/pagination/paginate.go @@ -6,24 +6,21 @@ type Iterator[T any] interface { Valid() bool } -type Stop bool - -const ( - Break Stop = true - Continue Stop = false -) - // Paginate is a function that paginates over an iterator. The callback is executed for each iteration and if it -// returns true, the pagination stops. The function returns the amount of iterations before stopping. -func Paginate[T any](iter Iterator[T], perPage uint64, cb func(T) Stop) uint64 { +// returns true, the pagination stops. The callback also returns the weight of the iteration. That is, one iteration +// may be counted as multiple iterations. For example, in case if the called decides that the iteration is heavy +// or time-consuming. The function returns the amount of iterations before stopping. +func Paginate[T any]( + iter Iterator[T], + perPage uint64, + cb func(T) (stop bool, weight uint64), +) uint64 { iterations := uint64(0) - for ; iterations < perPage && iter.Valid(); iter.Next() { - iterations++ - - stop := cb(iter.Value()) - if stop { - break - } + stop := false + for ; !stop && iterations < perPage && iter.Valid(); iter.Next() { + var weight uint64 + stop, weight = cb(iter.Value()) + iterations += weight } return iterations } diff --git a/internal/pagination/paginate_test.go b/internal/pagination/paginate_test.go index 73a952220..824a346a8 100644 --- a/internal/pagination/paginate_test.go +++ b/internal/pagination/paginate_test.go @@ -31,53 +31,139 @@ func (t *testIterator) Valid() bool { func TestPaginate(t *testing.T) { testCases := []struct { - name string - iterator pagination.Iterator[int] - perPage uint64 - stopValue int - expected uint64 + name string + iterator pagination.Iterator[int] + maxIterations uint64 + stopValue int + expectedIterations uint64 + iterationWeight uint64 }{ { - name: "Empty iterator", - iterator: newTestIterator([]int{}), - perPage: 5, - stopValue: -1, - expected: 0, + name: "Empty iterator", + iterator: newTestIterator([]int{}), + maxIterations: 5, + stopValue: -1, + expectedIterations: 0, + iterationWeight: 1, }, { - name: "Non-Empty iterator less than perPage", - iterator: newTestIterator([]int{1, 2, 3}), - perPage: 10, - stopValue: -1, - expected: 3, + name: "Non-Empty iterator less than maxIterations", + iterator: newTestIterator([]int{1, 2, 3}), + maxIterations: 10, + stopValue: -1, + expectedIterations: 3, + iterationWeight: 1, }, { - name: "Non-empty iterator greater than perPage", - iterator: newTestIterator([]int{1, 2, 3, 4, 5, 6, 7}), - perPage: 5, - stopValue: -1, - expected: 5, + name: "Non-empty iterator greater than maxIterations", + iterator: newTestIterator([]int{1, 2, 3, 4, 5, 6, 7}), + maxIterations: 5, + stopValue: -1, + expectedIterations: 5, + iterationWeight: 1, }, { - name: "Zero perPage", - iterator: newTestIterator([]int{1, 2, 3, 4, 5, 6, 7}), - perPage: 0, - stopValue: 6, - expected: 0, + name: "Zero maxIterations", + iterator: newTestIterator([]int{1, 2, 3, 4, 5, 6, 7}), + maxIterations: 0, + stopValue: 6, + expectedIterations: 0, + iterationWeight: 1, }, { - name: "Non-Empty iterator with stop condition", - iterator: newTestIterator([]int{1, 2, 3, 4, 5, 6, 7}), - perPage: 10, - stopValue: 3, - expected: 3, + name: "Non-Empty iterator with stop condition", + iterator: newTestIterator([]int{1, 2, 3, 4, 5, 6, 7}), + maxIterations: 10, + stopValue: 3, + expectedIterations: 3, + iterationWeight: 1, + }, + { + name: "Empty iterator, >1 iteration weight", + iterator: newTestIterator([]int{}), + maxIterations: 5, + stopValue: -1, + expectedIterations: 0, + iterationWeight: 3, + }, + { + name: "Non-Empty iterator less than maxIterations, >1 iteration weight", + iterator: newTestIterator([]int{1, 2, 3}), + maxIterations: 10, + stopValue: -1, + expectedIterations: 9, + iterationWeight: 3, + }, + { + name: "Non-empty iterator greater than maxIterations, >1 iteration weight", + iterator: newTestIterator([]int{1, 2, 3, 4, 5, 6, 7}), + maxIterations: 5, + stopValue: -1, + expectedIterations: 6, + iterationWeight: 3, + }, + { + name: "Zero maxIterations, >1 iteration weight", + iterator: newTestIterator([]int{1, 2, 3, 4, 5, 6, 7}), + maxIterations: 0, + stopValue: 6, + expectedIterations: 0, + iterationWeight: 3, + }, + { + name: "Non-Empty iterator with stop condition, >1 iteration weight", + iterator: newTestIterator([]int{1, 2, 3, 4, 5, 6, 7}), + maxIterations: 10, + stopValue: 3, + expectedIterations: 9, + iterationWeight: 3, + }, + { + name: "Empty iterator, 0 iteration weight", + iterator: newTestIterator([]int{}), + maxIterations: 5, + stopValue: -1, + expectedIterations: 0, + iterationWeight: 0, + }, + { + name: "Non-Empty iterator less than maxIterations, 0 iteration weight", + iterator: newTestIterator([]int{1, 2, 3}), + maxIterations: 10, + stopValue: -1, + expectedIterations: 9, + iterationWeight: 3, + }, + { + name: "Non-empty iterator greater than maxIterations, 0 iteration weight", + iterator: newTestIterator([]int{1, 2, 3, 4, 5, 6, 7}), + maxIterations: 5, + stopValue: -1, + expectedIterations: 6, + iterationWeight: 3, + }, + { + name: "Zero maxIterations, 0 iteration weight", + iterator: newTestIterator([]int{1, 2, 3, 4, 5, 6, 7}), + maxIterations: 0, + stopValue: 6, + expectedIterations: 0, + iterationWeight: 3, + }, + { + name: "Non-Empty iterator with stop condition, 0 iteration weight", + iterator: newTestIterator([]int{1, 2, 3, 4, 5, 6, 7}), + maxIterations: 10, + stopValue: 3, + expectedIterations: 9, + iterationWeight: 3, }, } for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - result := pagination.Paginate(tc.iterator, tc.perPage, func(i int) pagination.Stop { return i == tc.stopValue }) - require.Equal(t, tc.expected, result) + result := pagination.Paginate(tc.iterator, tc.maxIterations, func(i int) (bool, uint64) { return i == tc.stopValue, tc.iterationWeight }) + require.Equal(t, tc.expectedIterations, result) }) } } diff --git a/x/incentives/types/codec.go b/x/incentives/types/codec.go index 9bd8e9f4f..3a6d47581 100644 --- a/x/incentives/types/codec.go +++ b/x/incentives/types/codec.go @@ -16,7 +16,7 @@ var ( // LegacyAmino codec. These types are used for Amino JSON serialization. func RegisterCodec(cdc *codec.LegacyAmino) { cdc.RegisterConcrete(&MsgCreateGauge{}, "dymensionxyz/dymension/incentives/CreateGauge", nil) - cdc.RegisterConcrete(&MsgAddToGauge{}, "dymensionxyz/dymension/incentives/AddToGauge", nil) + cdc.RegisterConcrete(&MsgAddToGauge{}, "dymensionxyz/dymension/incentives/CalculateGaugeRewards", nil) } // RegisterInterfaces registers interfaces and implementations of the incentives module. diff --git a/x/incentives/types/tx.pb.go b/x/incentives/types/tx.pb.go index 9888b42d1..e36de7526 100644 --- a/x/incentives/types/tx.pb.go +++ b/x/incentives/types/tx.pb.go @@ -357,7 +357,7 @@ func (c *msgClient) CreateGauge(ctx context.Context, in *MsgCreateGauge, opts .. func (c *msgClient) AddToGauge(ctx context.Context, in *MsgAddToGauge, opts ...grpc.CallOption) (*MsgAddToGaugeResponse, error) { out := new(MsgAddToGaugeResponse) - err := c.cc.Invoke(ctx, "/dymensionxyz.dymension.incentives.Msg/AddToGauge", in, out, opts...) + err := c.cc.Invoke(ctx, "/dymensionxyz.dymension.incentives.Msg/CalculateGaugeRewards", in, out, opts...) if err != nil { return nil, err } @@ -378,7 +378,7 @@ func (*UnimplementedMsgServer) CreateGauge(ctx context.Context, req *MsgCreateGa return nil, status.Errorf(codes.Unimplemented, "method CreateGauge not implemented") } func (*UnimplementedMsgServer) AddToGauge(ctx context.Context, req *MsgAddToGauge) (*MsgAddToGaugeResponse, error) { - return nil, status.Errorf(codes.Unimplemented, "method AddToGauge not implemented") + return nil, status.Errorf(codes.Unimplemented, "method CalculateGaugeRewards not implemented") } func RegisterMsgServer(s grpc1.Server, srv MsgServer) { @@ -413,7 +413,7 @@ func _Msg_AddToGauge_Handler(srv interface{}, ctx context.Context, dec func(inte } info := &grpc.UnaryServerInfo{ Server: srv, - FullMethod: "/dymensionxyz.dymension.incentives.Msg/AddToGauge", + FullMethod: "/dymensionxyz.dymension.incentives.Msg/CalculateGaugeRewards", } handler := func(ctx context.Context, req interface{}) (interface{}, error) { return srv.(MsgServer).AddToGauge(ctx, req.(*MsgAddToGauge)) @@ -430,7 +430,7 @@ var _Msg_serviceDesc = grpc.ServiceDesc{ Handler: _Msg_CreateGauge_Handler, }, { - MethodName: "AddToGauge", + MethodName: "CalculateGaugeRewards", Handler: _Msg_AddToGauge_Handler, }, }, diff --git a/x/streamer/keeper/abci.go b/x/streamer/keeper/abci.go index c86de13f1..a979d0ab6 100644 --- a/x/streamer/keeper/abci.go +++ b/x/streamer/keeper/abci.go @@ -6,6 +6,7 @@ import ( sdk "github.com/cosmos/cosmos-sdk/types" "github.com/dymensionxyz/sdk-utils/utils/uevent" + incentivestypes "github.com/dymensionxyz/dymension/v3/x/incentives/types" "github.com/dymensionxyz/dymension/v3/x/streamer/types" ) @@ -22,7 +23,9 @@ func (k Keeper) EndBlock(ctx sdk.Context) error { maxIterations := k.GetParams(ctx).MaxIterationsPerBlock totalIterations := uint64(0) - totalDistributed := sdk.NewCoins() + + streamCache := newStreamInfo(streams) + gaugeCache := newGaugeInfo() for _, p := range epochPointers { remainIterations := maxIterations - totalIterations @@ -31,11 +34,9 @@ func (k Keeper) EndBlock(ctx sdk.Context) error { break // no more iterations available for this block } - result := k.DistributeRewards(ctx, p, remainIterations, streams) + result := k.CalculateRewards(ctx, p, remainIterations, streamCache, gaugeCache) totalIterations += result.Iterations - totalDistributed = totalDistributed.Add(result.DistributedCoins...) - streams = result.FilledStreams err = k.SaveEpochPointer(ctx, result.NewPointer) if err != nil { @@ -43,8 +44,23 @@ func (k Keeper) EndBlock(ctx sdk.Context) error { } } + // Filter gauges to distribute + toDistribute := k.filterGauges(ctx, gaugeCache) + + // Send coins to distribute to the x/incentives module + err = k.bk.SendCoinsFromModuleToModule(ctx, types.ModuleName, incentivestypes.ModuleName, streamCache.totalDistr) + if err != nil { + return fmt.Errorf("send coins: %w", err) + } + + // Distribute the rewards + _, err = k.ik.Distribute(ctx, toDistribute) + if err != nil { + return fmt.Errorf("distribute: %w", err) + } + // Save stream updates - for _, stream := range streams { + for _, stream := range streamCache.getStreams() { err = k.SetStream(ctx, &stream) if err != nil { return fmt.Errorf("set stream: %w", err) @@ -54,7 +70,7 @@ func (k Keeper) EndBlock(ctx sdk.Context) error { err = uevent.EmitTypedEvent(ctx, &types.EventEndBlock{ Iterations: totalIterations, MaxIterations: maxIterations, - Distributed: totalDistributed, + Distributed: streamCache.totalDistr, }) if err != nil { return fmt.Errorf("emit typed event: %w", err) diff --git a/x/streamer/keeper/abci_test.go b/x/streamer/keeper/abci_test.go index 6ef87c046..c953429bd 100644 --- a/x/streamer/keeper/abci_test.go +++ b/x/streamer/keeper/abci_test.go @@ -7,73 +7,93 @@ import ( "cosmossdk.io/math" sdk "github.com/cosmos/cosmos-sdk/types" + "github.com/dymensionxyz/dymension/v3/app/apptesting" "github.com/dymensionxyz/dymension/v3/x/streamer/types" ) func (s *KeeperTestSuite) TestProcessEpochPointer() { + addrs := apptesting.CreateRandomAccounts(2) tests := []struct { name string maxIterationsPerBlock uint64 numGauges int - blocksInEpoch int + blocksToProcess int + initialLockups []lockup streams []types.Stream expectedBlockResults []blockResults }{ { + // In this test, the number of gauges is less than the number of iterations. We simulate the + // execution of the first block of the epoch: + // 1. There are 4 streams, and each streams holds 200 stake + // 2. Each stream has 4 gauges with 25% weight each => the number of gauges is 16 + // 3. We start with shorter epochs, so firstly we fill streams with the hour epoch (1 and 4) + // 4. After, we continue with the longer streams (2 and 3) + // 5. There are 9 iterations limit per block, so we fill first 9 gauges: + // * 4 gauges from stream 1 + // * 4 gauges from stream 4 + // * 1 gauge from stream 2 + // 6. Each gauge gets 25% of the stream => 50 stake => 50 * 9 = 450 stake is totally distributed + // 7. Initially, we have 2 lockup owners with 100 stake locked each, so each of them gets 50% of rewards + // of the stake denom => every owner will get 225 stake name: "1 block in the epoch", maxIterationsPerBlock: 9, numGauges: 16, - blocksInEpoch: 1, + blocksToProcess: 1, + initialLockups: []lockup{ // every lockup owner receives 50% of the gauge rewards + {owner: addrs[0], balance: sdk.NewCoins(sdk.NewInt64Coin("stake", 100))}, + {owner: addrs[1], balance: sdk.NewCoins(sdk.NewInt64Coin("stake", 100))}, + }, streams: []types.Stream{ { Id: 1, DistrEpochIdentifier: "hour", - Coins: sdk.NewCoins(sdk.NewInt64Coin("udym", 100)), + Coins: sdk.NewCoins(sdk.NewInt64Coin("stake", 200)), DistributeTo: &types.DistrInfo{ Records: []types.DistrRecord{ - {GaugeId: 1, Weight: math.NewInt(25)}, - {GaugeId: 2, Weight: math.NewInt(25)}, - {GaugeId: 3, Weight: math.NewInt(25)}, - {GaugeId: 4, Weight: math.NewInt(25)}, + {GaugeId: 1, Weight: math.NewInt(50)}, + {GaugeId: 2, Weight: math.NewInt(50)}, + {GaugeId: 3, Weight: math.NewInt(50)}, + {GaugeId: 4, Weight: math.NewInt(50)}, }, }, }, { Id: 2, DistrEpochIdentifier: "day", - Coins: sdk.NewCoins(sdk.NewInt64Coin("udym", 100)), + Coins: sdk.NewCoins(sdk.NewInt64Coin("stake", 200)), DistributeTo: &types.DistrInfo{ Records: []types.DistrRecord{ - {GaugeId: 5, Weight: math.NewInt(25)}, - {GaugeId: 6, Weight: math.NewInt(25)}, - {GaugeId: 7, Weight: math.NewInt(25)}, - {GaugeId: 8, Weight: math.NewInt(25)}, + {GaugeId: 5, Weight: math.NewInt(50)}, + {GaugeId: 6, Weight: math.NewInt(50)}, + {GaugeId: 7, Weight: math.NewInt(50)}, + {GaugeId: 8, Weight: math.NewInt(50)}, }, }, }, { Id: 3, DistrEpochIdentifier: "day", - Coins: sdk.NewCoins(sdk.NewInt64Coin("udym", 100)), + Coins: sdk.NewCoins(sdk.NewInt64Coin("stake", 200)), DistributeTo: &types.DistrInfo{ Records: []types.DistrRecord{ - {GaugeId: 9, Weight: math.NewInt(25)}, - {GaugeId: 10, Weight: math.NewInt(25)}, - {GaugeId: 11, Weight: math.NewInt(25)}, - {GaugeId: 12, Weight: math.NewInt(25)}, + {GaugeId: 9, Weight: math.NewInt(50)}, + {GaugeId: 10, Weight: math.NewInt(50)}, + {GaugeId: 11, Weight: math.NewInt(50)}, + {GaugeId: 12, Weight: math.NewInt(50)}, }, }, }, { Id: 4, DistrEpochIdentifier: "hour", - Coins: sdk.NewCoins(sdk.NewInt64Coin("udym", 100)), + Coins: sdk.NewCoins(sdk.NewInt64Coin("stake", 200)), DistributeTo: &types.DistrInfo{ Records: []types.DistrRecord{ - {GaugeId: 13, Weight: math.NewInt(25)}, - {GaugeId: 14, Weight: math.NewInt(25)}, - {GaugeId: 15, Weight: math.NewInt(25)}, - {GaugeId: 16, Weight: math.NewInt(25)}, + {GaugeId: 13, Weight: math.NewInt(50)}, + {GaugeId: 14, Weight: math.NewInt(50)}, + {GaugeId: 15, Weight: math.NewInt(50)}, + {GaugeId: 16, Weight: math.NewInt(50)}, }, }, }, @@ -103,19 +123,19 @@ func (s *KeeperTestSuite) TestProcessEpochPointer() { }, }, distributedCoins: []distributedCoins{ - {streamID: 1, coins: sdk.NewCoins(sdk.NewInt64Coin("udym", 100))}, - {streamID: 2, coins: sdk.NewCoins(sdk.NewInt64Coin("udym", 25))}, + {streamID: 1, coins: sdk.NewCoins(sdk.NewInt64Coin("stake", 200))}, + {streamID: 2, coins: sdk.NewCoins(sdk.NewInt64Coin("stake", 50))}, {streamID: 3, coins: nil}, - {streamID: 4, coins: sdk.NewCoins(sdk.NewInt64Coin("udym", 100))}, + {streamID: 4, coins: sdk.NewCoins(sdk.NewInt64Coin("stake", 200))}, }, gauges: []gaugeCoins{ // 1st stream - {gaugeID: 1, coins: sdk.NewCoins(sdk.NewInt64Coin("udym", 25))}, - {gaugeID: 2, coins: sdk.NewCoins(sdk.NewInt64Coin("udym", 25))}, - {gaugeID: 3, coins: sdk.NewCoins(sdk.NewInt64Coin("udym", 25))}, - {gaugeID: 4, coins: sdk.NewCoins(sdk.NewInt64Coin("udym", 25))}, + {gaugeID: 1, coins: sdk.NewCoins(sdk.NewInt64Coin("stake", 50))}, + {gaugeID: 2, coins: sdk.NewCoins(sdk.NewInt64Coin("stake", 50))}, + {gaugeID: 3, coins: sdk.NewCoins(sdk.NewInt64Coin("stake", 50))}, + {gaugeID: 4, coins: sdk.NewCoins(sdk.NewInt64Coin("stake", 50))}, // 2nd stream - {gaugeID: 5, coins: sdk.NewCoins(sdk.NewInt64Coin("udym", 25))}, + {gaugeID: 5, coins: sdk.NewCoins(sdk.NewInt64Coin("stake", 50))}, {gaugeID: 6, coins: nil}, {gaugeID: 7, coins: nil}, {gaugeID: 8, coins: nil}, @@ -125,10 +145,14 @@ func (s *KeeperTestSuite) TestProcessEpochPointer() { {gaugeID: 11, coins: nil}, {gaugeID: 12, coins: nil}, // 4th stream - {gaugeID: 13, coins: sdk.NewCoins(sdk.NewInt64Coin("udym", 25))}, - {gaugeID: 14, coins: sdk.NewCoins(sdk.NewInt64Coin("udym", 25))}, - {gaugeID: 15, coins: sdk.NewCoins(sdk.NewInt64Coin("udym", 25))}, - {gaugeID: 16, coins: sdk.NewCoins(sdk.NewInt64Coin("udym", 25))}, + {gaugeID: 13, coins: sdk.NewCoins(sdk.NewInt64Coin("stake", 50))}, + {gaugeID: 14, coins: sdk.NewCoins(sdk.NewInt64Coin("stake", 50))}, + {gaugeID: 15, coins: sdk.NewCoins(sdk.NewInt64Coin("stake", 50))}, + {gaugeID: 16, coins: sdk.NewCoins(sdk.NewInt64Coin("stake", 50))}, + }, + lockups: []lockup{ + {owner: addrs[0], balance: sdk.NewCoins(sdk.NewInt64Coin("stake", 225))}, + {owner: addrs[1], balance: sdk.NewCoins(sdk.NewInt64Coin("stake", 225))}, }, }, }, @@ -137,57 +161,61 @@ func (s *KeeperTestSuite) TestProcessEpochPointer() { name: "Several blocks in the epoch", maxIterationsPerBlock: 5, numGauges: 16, - blocksInEpoch: 2, + blocksToProcess: 2, + initialLockups: []lockup{ // every lockup owner receives 50% of the gauge rewards + {owner: addrs[0], balance: sdk.NewCoins(sdk.NewInt64Coin("stake", 100))}, + {owner: addrs[1], balance: sdk.NewCoins(sdk.NewInt64Coin("stake", 100))}, + }, streams: []types.Stream{ { Id: 1, DistrEpochIdentifier: "hour", - Coins: sdk.NewCoins(sdk.NewInt64Coin("udym", 100)), + Coins: sdk.NewCoins(sdk.NewInt64Coin("stake", 200)), DistributeTo: &types.DistrInfo{ Records: []types.DistrRecord{ - {GaugeId: 1, Weight: math.NewInt(25)}, - {GaugeId: 2, Weight: math.NewInt(25)}, - {GaugeId: 3, Weight: math.NewInt(25)}, - {GaugeId: 4, Weight: math.NewInt(25)}, + {GaugeId: 1, Weight: math.NewInt(50)}, + {GaugeId: 2, Weight: math.NewInt(50)}, + {GaugeId: 3, Weight: math.NewInt(50)}, + {GaugeId: 4, Weight: math.NewInt(50)}, }, }, }, { Id: 2, DistrEpochIdentifier: "day", - Coins: sdk.NewCoins(sdk.NewInt64Coin("udym", 100)), + Coins: sdk.NewCoins(sdk.NewInt64Coin("stake", 200)), DistributeTo: &types.DistrInfo{ Records: []types.DistrRecord{ - {GaugeId: 5, Weight: math.NewInt(25)}, - {GaugeId: 6, Weight: math.NewInt(25)}, - {GaugeId: 7, Weight: math.NewInt(25)}, - {GaugeId: 8, Weight: math.NewInt(25)}, + {GaugeId: 5, Weight: math.NewInt(50)}, + {GaugeId: 6, Weight: math.NewInt(50)}, + {GaugeId: 7, Weight: math.NewInt(50)}, + {GaugeId: 8, Weight: math.NewInt(50)}, }, }, }, { Id: 3, DistrEpochIdentifier: "day", - Coins: sdk.NewCoins(sdk.NewInt64Coin("udym", 100)), + Coins: sdk.NewCoins(sdk.NewInt64Coin("stake", 200)), DistributeTo: &types.DistrInfo{ Records: []types.DistrRecord{ - {GaugeId: 9, Weight: math.NewInt(25)}, - {GaugeId: 10, Weight: math.NewInt(25)}, - {GaugeId: 11, Weight: math.NewInt(25)}, - {GaugeId: 12, Weight: math.NewInt(25)}, + {GaugeId: 9, Weight: math.NewInt(50)}, + {GaugeId: 10, Weight: math.NewInt(50)}, + {GaugeId: 11, Weight: math.NewInt(50)}, + {GaugeId: 12, Weight: math.NewInt(50)}, }, }, }, { Id: 4, DistrEpochIdentifier: "hour", - Coins: sdk.NewCoins(sdk.NewInt64Coin("udym", 100)), + Coins: sdk.NewCoins(sdk.NewInt64Coin("stake", 200)), DistributeTo: &types.DistrInfo{ Records: []types.DistrRecord{ - {GaugeId: 13, Weight: math.NewInt(25)}, - {GaugeId: 14, Weight: math.NewInt(25)}, - {GaugeId: 15, Weight: math.NewInt(25)}, - {GaugeId: 16, Weight: math.NewInt(25)}, + {GaugeId: 13, Weight: math.NewInt(50)}, + {GaugeId: 14, Weight: math.NewInt(50)}, + {GaugeId: 15, Weight: math.NewInt(50)}, + {GaugeId: 16, Weight: math.NewInt(50)}, }, }, }, @@ -217,17 +245,17 @@ func (s *KeeperTestSuite) TestProcessEpochPointer() { }, }, distributedCoins: []distributedCoins{ - {streamID: 1, coins: sdk.NewCoins(sdk.NewInt64Coin("udym", 100))}, + {streamID: 1, coins: sdk.NewCoins(sdk.NewInt64Coin("stake", 200))}, {streamID: 2, coins: nil}, {streamID: 3, coins: nil}, - {streamID: 4, coins: sdk.NewCoins(sdk.NewInt64Coin("udym", 25))}, + {streamID: 4, coins: sdk.NewCoins(sdk.NewInt64Coin("stake", 50))}, }, gauges: []gaugeCoins{ // 1st stream - {gaugeID: 1, coins: sdk.NewCoins(sdk.NewInt64Coin("udym", 25))}, - {gaugeID: 2, coins: sdk.NewCoins(sdk.NewInt64Coin("udym", 25))}, - {gaugeID: 3, coins: sdk.NewCoins(sdk.NewInt64Coin("udym", 25))}, - {gaugeID: 4, coins: sdk.NewCoins(sdk.NewInt64Coin("udym", 25))}, + {gaugeID: 1, coins: sdk.NewCoins(sdk.NewInt64Coin("stake", 50))}, + {gaugeID: 2, coins: sdk.NewCoins(sdk.NewInt64Coin("stake", 50))}, + {gaugeID: 3, coins: sdk.NewCoins(sdk.NewInt64Coin("stake", 50))}, + {gaugeID: 4, coins: sdk.NewCoins(sdk.NewInt64Coin("stake", 50))}, // 2nd stream {gaugeID: 5, coins: nil}, {gaugeID: 6, coins: nil}, @@ -239,11 +267,15 @@ func (s *KeeperTestSuite) TestProcessEpochPointer() { {gaugeID: 11, coins: nil}, {gaugeID: 12, coins: nil}, // 4th stream - {gaugeID: 13, coins: sdk.NewCoins(sdk.NewInt64Coin("udym", 25))}, + {gaugeID: 13, coins: sdk.NewCoins(sdk.NewInt64Coin("stake", 50))}, {gaugeID: 14, coins: nil}, {gaugeID: 15, coins: nil}, {gaugeID: 16, coins: nil}, }, + lockups: []lockup{ + {owner: addrs[0], balance: sdk.NewCoins(sdk.NewInt64Coin("stake", 125))}, + {owner: addrs[1], balance: sdk.NewCoins(sdk.NewInt64Coin("stake", 125))}, + }, }, { height: 1, @@ -269,20 +301,20 @@ func (s *KeeperTestSuite) TestProcessEpochPointer() { }, }, distributedCoins: []distributedCoins{ - {streamID: 1, coins: sdk.NewCoins(sdk.NewInt64Coin("udym", 100))}, - {streamID: 2, coins: sdk.NewCoins(sdk.NewInt64Coin("udym", 50))}, + {streamID: 1, coins: sdk.NewCoins(sdk.NewInt64Coin("stake", 200))}, + {streamID: 2, coins: sdk.NewCoins(sdk.NewInt64Coin("stake", 100))}, {streamID: 3, coins: nil}, - {streamID: 4, coins: sdk.NewCoins(sdk.NewInt64Coin("udym", 100))}, + {streamID: 4, coins: sdk.NewCoins(sdk.NewInt64Coin("stake", 200))}, }, gauges: []gaugeCoins{ // 1st stream - {gaugeID: 1, coins: sdk.NewCoins(sdk.NewInt64Coin("udym", 25))}, - {gaugeID: 2, coins: sdk.NewCoins(sdk.NewInt64Coin("udym", 25))}, - {gaugeID: 3, coins: sdk.NewCoins(sdk.NewInt64Coin("udym", 25))}, - {gaugeID: 4, coins: sdk.NewCoins(sdk.NewInt64Coin("udym", 25))}, + {gaugeID: 1, coins: sdk.NewCoins(sdk.NewInt64Coin("stake", 50))}, + {gaugeID: 2, coins: sdk.NewCoins(sdk.NewInt64Coin("stake", 50))}, + {gaugeID: 3, coins: sdk.NewCoins(sdk.NewInt64Coin("stake", 50))}, + {gaugeID: 4, coins: sdk.NewCoins(sdk.NewInt64Coin("stake", 50))}, // 2nd stream - {gaugeID: 5, coins: sdk.NewCoins(sdk.NewInt64Coin("udym", 25))}, - {gaugeID: 6, coins: sdk.NewCoins(sdk.NewInt64Coin("udym", 25))}, + {gaugeID: 5, coins: sdk.NewCoins(sdk.NewInt64Coin("stake", 50))}, + {gaugeID: 6, coins: sdk.NewCoins(sdk.NewInt64Coin("stake", 50))}, {gaugeID: 7, coins: nil}, {gaugeID: 8, coins: nil}, // 3rd stream @@ -291,10 +323,14 @@ func (s *KeeperTestSuite) TestProcessEpochPointer() { {gaugeID: 11, coins: nil}, {gaugeID: 12, coins: nil}, // 4th stream - {gaugeID: 13, coins: sdk.NewCoins(sdk.NewInt64Coin("udym", 25))}, - {gaugeID: 14, coins: sdk.NewCoins(sdk.NewInt64Coin("udym", 25))}, - {gaugeID: 15, coins: sdk.NewCoins(sdk.NewInt64Coin("udym", 25))}, - {gaugeID: 16, coins: sdk.NewCoins(sdk.NewInt64Coin("udym", 25))}, + {gaugeID: 13, coins: sdk.NewCoins(sdk.NewInt64Coin("stake", 50))}, + {gaugeID: 14, coins: sdk.NewCoins(sdk.NewInt64Coin("stake", 50))}, + {gaugeID: 15, coins: sdk.NewCoins(sdk.NewInt64Coin("stake", 50))}, + {gaugeID: 16, coins: sdk.NewCoins(sdk.NewInt64Coin("stake", 50))}, + }, + lockups: []lockup{ + {owner: addrs[0], balance: sdk.NewCoins(sdk.NewInt64Coin("stake", 250))}, + {owner: addrs[1], balance: sdk.NewCoins(sdk.NewInt64Coin("stake", 250))}, }, }, }, @@ -303,12 +339,16 @@ func (s *KeeperTestSuite) TestProcessEpochPointer() { name: "Send all reward in one single block", maxIterationsPerBlock: 5, numGauges: 4, - blocksInEpoch: 5, + blocksToProcess: 5, + initialLockups: []lockup{ // every lockup owner receives 50% of the gauge rewards + {owner: addrs[0], balance: sdk.NewCoins(sdk.NewInt64Coin("stake", 100))}, + {owner: addrs[1], balance: sdk.NewCoins(sdk.NewInt64Coin("stake", 100))}, + }, streams: []types.Stream{ { Id: 1, DistrEpochIdentifier: "hour", - Coins: sdk.NewCoins(sdk.NewInt64Coin("udym", 1)), + Coins: sdk.NewCoins(sdk.NewInt64Coin("stake", 2)), DistributeTo: &types.DistrInfo{ Records: []types.DistrRecord{ {GaugeId: 1, Weight: math.NewInt(1)}, @@ -318,7 +358,7 @@ func (s *KeeperTestSuite) TestProcessEpochPointer() { { Id: 2, DistrEpochIdentifier: "day", - Coins: sdk.NewCoins(sdk.NewInt64Coin("udym", 1)), + Coins: sdk.NewCoins(sdk.NewInt64Coin("stake", 2)), DistributeTo: &types.DistrInfo{ Records: []types.DistrRecord{ {GaugeId: 2, Weight: math.NewInt(1)}, @@ -328,7 +368,7 @@ func (s *KeeperTestSuite) TestProcessEpochPointer() { { Id: 3, DistrEpochIdentifier: "day", - Coins: sdk.NewCoins(sdk.NewInt64Coin("udym", 1)), + Coins: sdk.NewCoins(sdk.NewInt64Coin("stake", 2)), DistributeTo: &types.DistrInfo{ Records: []types.DistrRecord{ {GaugeId: 3, Weight: math.NewInt(1)}, @@ -338,7 +378,7 @@ func (s *KeeperTestSuite) TestProcessEpochPointer() { { Id: 4, DistrEpochIdentifier: "hour", - Coins: sdk.NewCoins(sdk.NewInt64Coin("udym", 1)), + Coins: sdk.NewCoins(sdk.NewInt64Coin("stake", 2)), DistributeTo: &types.DistrInfo{ Records: []types.DistrRecord{ {GaugeId: 4, Weight: math.NewInt(1)}, @@ -371,16 +411,20 @@ func (s *KeeperTestSuite) TestProcessEpochPointer() { }, }, distributedCoins: []distributedCoins{ - {streamID: 1, coins: sdk.NewCoins(sdk.NewInt64Coin("udym", 1))}, - {streamID: 2, coins: sdk.NewCoins(sdk.NewInt64Coin("udym", 1))}, - {streamID: 3, coins: sdk.NewCoins(sdk.NewInt64Coin("udym", 1))}, - {streamID: 4, coins: sdk.NewCoins(sdk.NewInt64Coin("udym", 1))}, + {streamID: 1, coins: sdk.NewCoins(sdk.NewInt64Coin("stake", 2))}, + {streamID: 2, coins: sdk.NewCoins(sdk.NewInt64Coin("stake", 2))}, + {streamID: 3, coins: sdk.NewCoins(sdk.NewInt64Coin("stake", 2))}, + {streamID: 4, coins: sdk.NewCoins(sdk.NewInt64Coin("stake", 2))}, }, gauges: []gaugeCoins{ - {gaugeID: 1, coins: sdk.NewCoins(sdk.NewInt64Coin("udym", 1))}, - {gaugeID: 2, coins: sdk.NewCoins(sdk.NewInt64Coin("udym", 1))}, - {gaugeID: 3, coins: sdk.NewCoins(sdk.NewInt64Coin("udym", 1))}, - {gaugeID: 4, coins: sdk.NewCoins(sdk.NewInt64Coin("udym", 1))}, + {gaugeID: 1, coins: sdk.NewCoins(sdk.NewInt64Coin("stake", 2))}, + {gaugeID: 2, coins: sdk.NewCoins(sdk.NewInt64Coin("stake", 2))}, + {gaugeID: 3, coins: sdk.NewCoins(sdk.NewInt64Coin("stake", 2))}, + {gaugeID: 4, coins: sdk.NewCoins(sdk.NewInt64Coin("stake", 2))}, + }, + lockups: []lockup{ + {owner: addrs[0], balance: sdk.NewCoins(sdk.NewInt64Coin("stake", 4))}, + {owner: addrs[1], balance: sdk.NewCoins(sdk.NewInt64Coin("stake", 4))}, }, }, }, @@ -389,57 +433,61 @@ func (s *KeeperTestSuite) TestProcessEpochPointer() { name: "Many blocks", maxIterationsPerBlock: 3, numGauges: 16, - blocksInEpoch: 100, + blocksToProcess: 200, + initialLockups: []lockup{ // every lockup owner receives 50% of the gauge rewards + {owner: addrs[0], balance: sdk.NewCoins(sdk.NewInt64Coin("stake", 100))}, + {owner: addrs[1], balance: sdk.NewCoins(sdk.NewInt64Coin("stake", 100))}, + }, streams: []types.Stream{ { Id: 1, DistrEpochIdentifier: "hour", - Coins: sdk.NewCoins(sdk.NewInt64Coin("udym", 100)), + Coins: sdk.NewCoins(sdk.NewInt64Coin("stake", 200)), DistributeTo: &types.DistrInfo{ Records: []types.DistrRecord{ - {GaugeId: 1, Weight: math.NewInt(25)}, - {GaugeId: 2, Weight: math.NewInt(25)}, - {GaugeId: 3, Weight: math.NewInt(25)}, - {GaugeId: 4, Weight: math.NewInt(25)}, + {GaugeId: 1, Weight: math.NewInt(50)}, + {GaugeId: 2, Weight: math.NewInt(50)}, + {GaugeId: 3, Weight: math.NewInt(50)}, + {GaugeId: 4, Weight: math.NewInt(50)}, }, }, }, { Id: 2, DistrEpochIdentifier: "day", - Coins: sdk.NewCoins(sdk.NewInt64Coin("udym", 100)), + Coins: sdk.NewCoins(sdk.NewInt64Coin("stake", 200)), DistributeTo: &types.DistrInfo{ Records: []types.DistrRecord{ - {GaugeId: 5, Weight: math.NewInt(25)}, - {GaugeId: 6, Weight: math.NewInt(25)}, - {GaugeId: 7, Weight: math.NewInt(25)}, - {GaugeId: 8, Weight: math.NewInt(25)}, + {GaugeId: 5, Weight: math.NewInt(50)}, + {GaugeId: 6, Weight: math.NewInt(50)}, + {GaugeId: 7, Weight: math.NewInt(50)}, + {GaugeId: 8, Weight: math.NewInt(50)}, }, }, }, { Id: 3, DistrEpochIdentifier: "day", - Coins: sdk.NewCoins(sdk.NewInt64Coin("udym", 100)), + Coins: sdk.NewCoins(sdk.NewInt64Coin("stake", 200)), DistributeTo: &types.DistrInfo{ Records: []types.DistrRecord{ - {GaugeId: 9, Weight: math.NewInt(25)}, - {GaugeId: 10, Weight: math.NewInt(25)}, - {GaugeId: 11, Weight: math.NewInt(25)}, - {GaugeId: 12, Weight: math.NewInt(25)}, + {GaugeId: 9, Weight: math.NewInt(50)}, + {GaugeId: 10, Weight: math.NewInt(50)}, + {GaugeId: 11, Weight: math.NewInt(50)}, + {GaugeId: 12, Weight: math.NewInt(50)}, }, }, }, { Id: 4, DistrEpochIdentifier: "hour", - Coins: sdk.NewCoins(sdk.NewInt64Coin("udym", 100)), + Coins: sdk.NewCoins(sdk.NewInt64Coin("stake", 200)), DistributeTo: &types.DistrInfo{ Records: []types.DistrRecord{ - {GaugeId: 13, Weight: math.NewInt(25)}, - {GaugeId: 14, Weight: math.NewInt(25)}, - {GaugeId: 15, Weight: math.NewInt(25)}, - {GaugeId: 16, Weight: math.NewInt(25)}, + {GaugeId: 13, Weight: math.NewInt(50)}, + {GaugeId: 14, Weight: math.NewInt(50)}, + {GaugeId: 15, Weight: math.NewInt(50)}, + {GaugeId: 16, Weight: math.NewInt(50)}, }, }, }, @@ -469,16 +517,16 @@ func (s *KeeperTestSuite) TestProcessEpochPointer() { }, }, distributedCoins: []distributedCoins{ - {streamID: 1, coins: sdk.NewCoins(sdk.NewInt64Coin("udym", 75))}, + {streamID: 1, coins: sdk.NewCoins(sdk.NewInt64Coin("stake", 150))}, {streamID: 2, coins: nil}, {streamID: 3, coins: nil}, {streamID: 4, coins: nil}, }, gauges: []gaugeCoins{ // 1st stream - {gaugeID: 1, coins: sdk.NewCoins(sdk.NewInt64Coin("udym", 25))}, - {gaugeID: 2, coins: sdk.NewCoins(sdk.NewInt64Coin("udym", 25))}, - {gaugeID: 3, coins: sdk.NewCoins(sdk.NewInt64Coin("udym", 25))}, + {gaugeID: 1, coins: sdk.NewCoins(sdk.NewInt64Coin("stake", 50))}, + {gaugeID: 2, coins: sdk.NewCoins(sdk.NewInt64Coin("stake", 50))}, + {gaugeID: 3, coins: sdk.NewCoins(sdk.NewInt64Coin("stake", 50))}, {gaugeID: 4, coins: nil}, // 2nd stream {gaugeID: 5, coins: nil}, @@ -496,6 +544,10 @@ func (s *KeeperTestSuite) TestProcessEpochPointer() { {gaugeID: 15, coins: nil}, {gaugeID: 16, coins: nil}, }, + lockups: []lockup{ + {owner: addrs[0], balance: sdk.NewCoins(sdk.NewInt64Coin("stake", 75))}, + {owner: addrs[1], balance: sdk.NewCoins(sdk.NewInt64Coin("stake", 75))}, + }, }, { height: 1, @@ -521,17 +573,17 @@ func (s *KeeperTestSuite) TestProcessEpochPointer() { }, }, distributedCoins: []distributedCoins{ - {streamID: 1, coins: sdk.NewCoins(sdk.NewInt64Coin("udym", 100))}, + {streamID: 1, coins: sdk.NewCoins(sdk.NewInt64Coin("stake", 200))}, {streamID: 2, coins: nil}, {streamID: 3, coins: nil}, - {streamID: 4, coins: sdk.NewCoins(sdk.NewInt64Coin("udym", 50))}, + {streamID: 4, coins: sdk.NewCoins(sdk.NewInt64Coin("stake", 100))}, }, gauges: []gaugeCoins{ // 1st stream - {gaugeID: 1, coins: sdk.NewCoins(sdk.NewInt64Coin("udym", 25))}, - {gaugeID: 2, coins: sdk.NewCoins(sdk.NewInt64Coin("udym", 25))}, - {gaugeID: 3, coins: sdk.NewCoins(sdk.NewInt64Coin("udym", 25))}, - {gaugeID: 4, coins: sdk.NewCoins(sdk.NewInt64Coin("udym", 25))}, + {gaugeID: 1, coins: sdk.NewCoins(sdk.NewInt64Coin("stake", 50))}, + {gaugeID: 2, coins: sdk.NewCoins(sdk.NewInt64Coin("stake", 50))}, + {gaugeID: 3, coins: sdk.NewCoins(sdk.NewInt64Coin("stake", 50))}, + {gaugeID: 4, coins: sdk.NewCoins(sdk.NewInt64Coin("stake", 50))}, // 2nd stream {gaugeID: 5, coins: nil}, {gaugeID: 6, coins: nil}, @@ -543,11 +595,15 @@ func (s *KeeperTestSuite) TestProcessEpochPointer() { {gaugeID: 11, coins: nil}, {gaugeID: 12, coins: nil}, // 4th stream - {gaugeID: 13, coins: sdk.NewCoins(sdk.NewInt64Coin("udym", 25))}, - {gaugeID: 14, coins: sdk.NewCoins(sdk.NewInt64Coin("udym", 25))}, + {gaugeID: 13, coins: sdk.NewCoins(sdk.NewInt64Coin("stake", 50))}, + {gaugeID: 14, coins: sdk.NewCoins(sdk.NewInt64Coin("stake", 50))}, {gaugeID: 15, coins: nil}, {gaugeID: 16, coins: nil}, }, + lockups: []lockup{ + {owner: addrs[0], balance: sdk.NewCoins(sdk.NewInt64Coin("stake", 150))}, + {owner: addrs[1], balance: sdk.NewCoins(sdk.NewInt64Coin("stake", 150))}, + }, }, { height: 2, @@ -573,19 +629,19 @@ func (s *KeeperTestSuite) TestProcessEpochPointer() { }, }, distributedCoins: []distributedCoins{ - {streamID: 1, coins: sdk.NewCoins(sdk.NewInt64Coin("udym", 100))}, - {streamID: 2, coins: sdk.NewCoins(sdk.NewInt64Coin("udym", 25))}, + {streamID: 1, coins: sdk.NewCoins(sdk.NewInt64Coin("stake", 200))}, + {streamID: 2, coins: sdk.NewCoins(sdk.NewInt64Coin("stake", 50))}, {streamID: 3, coins: nil}, - {streamID: 4, coins: sdk.NewCoins(sdk.NewInt64Coin("udym", 100))}, + {streamID: 4, coins: sdk.NewCoins(sdk.NewInt64Coin("stake", 200))}, }, gauges: []gaugeCoins{ // 1st stream - {gaugeID: 1, coins: sdk.NewCoins(sdk.NewInt64Coin("udym", 25))}, - {gaugeID: 2, coins: sdk.NewCoins(sdk.NewInt64Coin("udym", 25))}, - {gaugeID: 3, coins: sdk.NewCoins(sdk.NewInt64Coin("udym", 25))}, - {gaugeID: 4, coins: sdk.NewCoins(sdk.NewInt64Coin("udym", 25))}, + {gaugeID: 1, coins: sdk.NewCoins(sdk.NewInt64Coin("stake", 50))}, + {gaugeID: 2, coins: sdk.NewCoins(sdk.NewInt64Coin("stake", 50))}, + {gaugeID: 3, coins: sdk.NewCoins(sdk.NewInt64Coin("stake", 50))}, + {gaugeID: 4, coins: sdk.NewCoins(sdk.NewInt64Coin("stake", 50))}, // 2nd stream - {gaugeID: 5, coins: sdk.NewCoins(sdk.NewInt64Coin("udym", 25))}, + {gaugeID: 5, coins: sdk.NewCoins(sdk.NewInt64Coin("stake", 50))}, {gaugeID: 6, coins: nil}, {gaugeID: 7, coins: nil}, {gaugeID: 8, coins: nil}, @@ -595,10 +651,14 @@ func (s *KeeperTestSuite) TestProcessEpochPointer() { {gaugeID: 11, coins: nil}, {gaugeID: 12, coins: nil}, // 4th stream - {gaugeID: 13, coins: sdk.NewCoins(sdk.NewInt64Coin("udym", 25))}, - {gaugeID: 14, coins: sdk.NewCoins(sdk.NewInt64Coin("udym", 25))}, - {gaugeID: 15, coins: sdk.NewCoins(sdk.NewInt64Coin("udym", 25))}, - {gaugeID: 16, coins: sdk.NewCoins(sdk.NewInt64Coin("udym", 25))}, + {gaugeID: 13, coins: sdk.NewCoins(sdk.NewInt64Coin("stake", 50))}, + {gaugeID: 14, coins: sdk.NewCoins(sdk.NewInt64Coin("stake", 50))}, + {gaugeID: 15, coins: sdk.NewCoins(sdk.NewInt64Coin("stake", 50))}, + {gaugeID: 16, coins: sdk.NewCoins(sdk.NewInt64Coin("stake", 50))}, + }, + lockups: []lockup{ + {owner: addrs[0], balance: sdk.NewCoins(sdk.NewInt64Coin("stake", 225))}, + {owner: addrs[1], balance: sdk.NewCoins(sdk.NewInt64Coin("stake", 225))}, }, }, { @@ -625,32 +685,36 @@ func (s *KeeperTestSuite) TestProcessEpochPointer() { }, }, distributedCoins: []distributedCoins{ - {streamID: 1, coins: sdk.NewCoins(sdk.NewInt64Coin("udym", 100))}, - {streamID: 2, coins: sdk.NewCoins(sdk.NewInt64Coin("udym", 100))}, + {streamID: 1, coins: sdk.NewCoins(sdk.NewInt64Coin("stake", 200))}, + {streamID: 2, coins: sdk.NewCoins(sdk.NewInt64Coin("stake", 200))}, {streamID: 3, coins: nil}, - {streamID: 4, coins: sdk.NewCoins(sdk.NewInt64Coin("udym", 100))}, + {streamID: 4, coins: sdk.NewCoins(sdk.NewInt64Coin("stake", 200))}, }, gauges: []gaugeCoins{ // 1st stream - {gaugeID: 1, coins: sdk.NewCoins(sdk.NewInt64Coin("udym", 25))}, - {gaugeID: 2, coins: sdk.NewCoins(sdk.NewInt64Coin("udym", 25))}, - {gaugeID: 3, coins: sdk.NewCoins(sdk.NewInt64Coin("udym", 25))}, - {gaugeID: 4, coins: sdk.NewCoins(sdk.NewInt64Coin("udym", 25))}, + {gaugeID: 1, coins: sdk.NewCoins(sdk.NewInt64Coin("stake", 50))}, + {gaugeID: 2, coins: sdk.NewCoins(sdk.NewInt64Coin("stake", 50))}, + {gaugeID: 3, coins: sdk.NewCoins(sdk.NewInt64Coin("stake", 50))}, + {gaugeID: 4, coins: sdk.NewCoins(sdk.NewInt64Coin("stake", 50))}, // 2nd stream - {gaugeID: 5, coins: sdk.NewCoins(sdk.NewInt64Coin("udym", 25))}, - {gaugeID: 6, coins: sdk.NewCoins(sdk.NewInt64Coin("udym", 25))}, - {gaugeID: 7, coins: sdk.NewCoins(sdk.NewInt64Coin("udym", 25))}, - {gaugeID: 8, coins: sdk.NewCoins(sdk.NewInt64Coin("udym", 25))}, + {gaugeID: 5, coins: sdk.NewCoins(sdk.NewInt64Coin("stake", 50))}, + {gaugeID: 6, coins: sdk.NewCoins(sdk.NewInt64Coin("stake", 50))}, + {gaugeID: 7, coins: sdk.NewCoins(sdk.NewInt64Coin("stake", 50))}, + {gaugeID: 8, coins: sdk.NewCoins(sdk.NewInt64Coin("stake", 50))}, // 3rd stream {gaugeID: 9, coins: nil}, {gaugeID: 10, coins: nil}, {gaugeID: 11, coins: nil}, {gaugeID: 12, coins: nil}, // 4th stream - {gaugeID: 13, coins: sdk.NewCoins(sdk.NewInt64Coin("udym", 25))}, - {gaugeID: 14, coins: sdk.NewCoins(sdk.NewInt64Coin("udym", 25))}, - {gaugeID: 15, coins: sdk.NewCoins(sdk.NewInt64Coin("udym", 25))}, - {gaugeID: 16, coins: sdk.NewCoins(sdk.NewInt64Coin("udym", 25))}, + {gaugeID: 13, coins: sdk.NewCoins(sdk.NewInt64Coin("stake", 50))}, + {gaugeID: 14, coins: sdk.NewCoins(sdk.NewInt64Coin("stake", 50))}, + {gaugeID: 15, coins: sdk.NewCoins(sdk.NewInt64Coin("stake", 50))}, + {gaugeID: 16, coins: sdk.NewCoins(sdk.NewInt64Coin("stake", 50))}, + }, + lockups: []lockup{ + {owner: addrs[0], balance: sdk.NewCoins(sdk.NewInt64Coin("stake", 300))}, + {owner: addrs[1], balance: sdk.NewCoins(sdk.NewInt64Coin("stake", 300))}, }, }, { @@ -677,32 +741,36 @@ func (s *KeeperTestSuite) TestProcessEpochPointer() { }, }, distributedCoins: []distributedCoins{ - {streamID: 1, coins: sdk.NewCoins(sdk.NewInt64Coin("udym", 100))}, - {streamID: 2, coins: sdk.NewCoins(sdk.NewInt64Coin("udym", 100))}, - {streamID: 3, coins: sdk.NewCoins(sdk.NewInt64Coin("udym", 75))}, - {streamID: 4, coins: sdk.NewCoins(sdk.NewInt64Coin("udym", 100))}, + {streamID: 1, coins: sdk.NewCoins(sdk.NewInt64Coin("stake", 200))}, + {streamID: 2, coins: sdk.NewCoins(sdk.NewInt64Coin("stake", 200))}, + {streamID: 3, coins: sdk.NewCoins(sdk.NewInt64Coin("stake", 150))}, + {streamID: 4, coins: sdk.NewCoins(sdk.NewInt64Coin("stake", 200))}, }, gauges: []gaugeCoins{ // 1st stream - {gaugeID: 1, coins: sdk.NewCoins(sdk.NewInt64Coin("udym", 25))}, - {gaugeID: 2, coins: sdk.NewCoins(sdk.NewInt64Coin("udym", 25))}, - {gaugeID: 3, coins: sdk.NewCoins(sdk.NewInt64Coin("udym", 25))}, - {gaugeID: 4, coins: sdk.NewCoins(sdk.NewInt64Coin("udym", 25))}, + {gaugeID: 1, coins: sdk.NewCoins(sdk.NewInt64Coin("stake", 50))}, + {gaugeID: 2, coins: sdk.NewCoins(sdk.NewInt64Coin("stake", 50))}, + {gaugeID: 3, coins: sdk.NewCoins(sdk.NewInt64Coin("stake", 50))}, + {gaugeID: 4, coins: sdk.NewCoins(sdk.NewInt64Coin("stake", 50))}, // 2nd stream - {gaugeID: 5, coins: sdk.NewCoins(sdk.NewInt64Coin("udym", 25))}, - {gaugeID: 6, coins: sdk.NewCoins(sdk.NewInt64Coin("udym", 25))}, - {gaugeID: 7, coins: sdk.NewCoins(sdk.NewInt64Coin("udym", 25))}, - {gaugeID: 8, coins: sdk.NewCoins(sdk.NewInt64Coin("udym", 25))}, + {gaugeID: 5, coins: sdk.NewCoins(sdk.NewInt64Coin("stake", 50))}, + {gaugeID: 6, coins: sdk.NewCoins(sdk.NewInt64Coin("stake", 50))}, + {gaugeID: 7, coins: sdk.NewCoins(sdk.NewInt64Coin("stake", 50))}, + {gaugeID: 8, coins: sdk.NewCoins(sdk.NewInt64Coin("stake", 50))}, // 3rd stream - {gaugeID: 9, coins: sdk.NewCoins(sdk.NewInt64Coin("udym", 25))}, - {gaugeID: 10, coins: sdk.NewCoins(sdk.NewInt64Coin("udym", 25))}, - {gaugeID: 11, coins: sdk.NewCoins(sdk.NewInt64Coin("udym", 25))}, + {gaugeID: 9, coins: sdk.NewCoins(sdk.NewInt64Coin("stake", 50))}, + {gaugeID: 10, coins: sdk.NewCoins(sdk.NewInt64Coin("stake", 50))}, + {gaugeID: 11, coins: sdk.NewCoins(sdk.NewInt64Coin("stake", 50))}, {gaugeID: 12, coins: nil}, // 4th stream - {gaugeID: 13, coins: sdk.NewCoins(sdk.NewInt64Coin("udym", 25))}, - {gaugeID: 14, coins: sdk.NewCoins(sdk.NewInt64Coin("udym", 25))}, - {gaugeID: 15, coins: sdk.NewCoins(sdk.NewInt64Coin("udym", 25))}, - {gaugeID: 16, coins: sdk.NewCoins(sdk.NewInt64Coin("udym", 25))}, + {gaugeID: 13, coins: sdk.NewCoins(sdk.NewInt64Coin("stake", 50))}, + {gaugeID: 14, coins: sdk.NewCoins(sdk.NewInt64Coin("stake", 50))}, + {gaugeID: 15, coins: sdk.NewCoins(sdk.NewInt64Coin("stake", 50))}, + {gaugeID: 16, coins: sdk.NewCoins(sdk.NewInt64Coin("stake", 50))}, + }, + lockups: []lockup{ + {owner: addrs[0], balance: sdk.NewCoins(sdk.NewInt64Coin("stake", 375))}, + {owner: addrs[1], balance: sdk.NewCoins(sdk.NewInt64Coin("stake", 375))}, }, }, { @@ -729,32 +797,36 @@ func (s *KeeperTestSuite) TestProcessEpochPointer() { }, }, distributedCoins: []distributedCoins{ - {streamID: 1, coins: sdk.NewCoins(sdk.NewInt64Coin("udym", 100))}, - {streamID: 2, coins: sdk.NewCoins(sdk.NewInt64Coin("udym", 100))}, - {streamID: 3, coins: sdk.NewCoins(sdk.NewInt64Coin("udym", 100))}, - {streamID: 4, coins: sdk.NewCoins(sdk.NewInt64Coin("udym", 100))}, + {streamID: 1, coins: sdk.NewCoins(sdk.NewInt64Coin("stake", 200))}, + {streamID: 2, coins: sdk.NewCoins(sdk.NewInt64Coin("stake", 200))}, + {streamID: 3, coins: sdk.NewCoins(sdk.NewInt64Coin("stake", 200))}, + {streamID: 4, coins: sdk.NewCoins(sdk.NewInt64Coin("stake", 200))}, }, gauges: []gaugeCoins{ // 1st stream - {gaugeID: 1, coins: sdk.NewCoins(sdk.NewInt64Coin("udym", 25))}, - {gaugeID: 2, coins: sdk.NewCoins(sdk.NewInt64Coin("udym", 25))}, - {gaugeID: 3, coins: sdk.NewCoins(sdk.NewInt64Coin("udym", 25))}, - {gaugeID: 4, coins: sdk.NewCoins(sdk.NewInt64Coin("udym", 25))}, + {gaugeID: 1, coins: sdk.NewCoins(sdk.NewInt64Coin("stake", 50))}, + {gaugeID: 2, coins: sdk.NewCoins(sdk.NewInt64Coin("stake", 50))}, + {gaugeID: 3, coins: sdk.NewCoins(sdk.NewInt64Coin("stake", 50))}, + {gaugeID: 4, coins: sdk.NewCoins(sdk.NewInt64Coin("stake", 50))}, // 2nd stream - {gaugeID: 5, coins: sdk.NewCoins(sdk.NewInt64Coin("udym", 25))}, - {gaugeID: 6, coins: sdk.NewCoins(sdk.NewInt64Coin("udym", 25))}, - {gaugeID: 7, coins: sdk.NewCoins(sdk.NewInt64Coin("udym", 25))}, - {gaugeID: 8, coins: sdk.NewCoins(sdk.NewInt64Coin("udym", 25))}, + {gaugeID: 5, coins: sdk.NewCoins(sdk.NewInt64Coin("stake", 50))}, + {gaugeID: 6, coins: sdk.NewCoins(sdk.NewInt64Coin("stake", 50))}, + {gaugeID: 7, coins: sdk.NewCoins(sdk.NewInt64Coin("stake", 50))}, + {gaugeID: 8, coins: sdk.NewCoins(sdk.NewInt64Coin("stake", 50))}, // 3rd stream - {gaugeID: 9, coins: sdk.NewCoins(sdk.NewInt64Coin("udym", 25))}, - {gaugeID: 10, coins: sdk.NewCoins(sdk.NewInt64Coin("udym", 25))}, - {gaugeID: 11, coins: sdk.NewCoins(sdk.NewInt64Coin("udym", 25))}, - {gaugeID: 12, coins: sdk.NewCoins(sdk.NewInt64Coin("udym", 25))}, + {gaugeID: 9, coins: sdk.NewCoins(sdk.NewInt64Coin("stake", 50))}, + {gaugeID: 10, coins: sdk.NewCoins(sdk.NewInt64Coin("stake", 50))}, + {gaugeID: 11, coins: sdk.NewCoins(sdk.NewInt64Coin("stake", 50))}, + {gaugeID: 12, coins: sdk.NewCoins(sdk.NewInt64Coin("stake", 50))}, // 4th stream - {gaugeID: 13, coins: sdk.NewCoins(sdk.NewInt64Coin("udym", 25))}, - {gaugeID: 14, coins: sdk.NewCoins(sdk.NewInt64Coin("udym", 25))}, - {gaugeID: 15, coins: sdk.NewCoins(sdk.NewInt64Coin("udym", 25))}, - {gaugeID: 16, coins: sdk.NewCoins(sdk.NewInt64Coin("udym", 25))}, + {gaugeID: 13, coins: sdk.NewCoins(sdk.NewInt64Coin("stake", 50))}, + {gaugeID: 14, coins: sdk.NewCoins(sdk.NewInt64Coin("stake", 50))}, + {gaugeID: 15, coins: sdk.NewCoins(sdk.NewInt64Coin("stake", 50))}, + {gaugeID: 16, coins: sdk.NewCoins(sdk.NewInt64Coin("stake", 50))}, + }, + lockups: []lockup{ + {owner: addrs[0], balance: sdk.NewCoins(sdk.NewInt64Coin("stake", 400))}, + {owner: addrs[1], balance: sdk.NewCoins(sdk.NewInt64Coin("stake", 400))}, }, }, }, @@ -768,7 +840,11 @@ func (s *KeeperTestSuite) TestProcessEpochPointer() { s.CreateGaugesUntil(tc.numGauges) - s.Require().LessOrEqual(len(tc.expectedBlockResults), tc.blocksInEpoch) + for _, lock := range tc.initialLockups { + s.LockTokens(lock.owner, lock.balance) + } + + s.Require().LessOrEqual(len(tc.expectedBlockResults), tc.blocksToProcess) // Update module params params := s.App.StreamerKeeper.GetParams(s.Ctx) @@ -785,7 +861,7 @@ func (s *KeeperTestSuite) TestProcessEpochPointer() { err = s.App.StreamerKeeper.BeforeEpochStart(s.Ctx, "day") s.Require().NoError(err) - for i := range tc.blocksInEpoch { + for i := range tc.blocksToProcess { err = s.App.StreamerKeeper.EndBlock(s.Ctx) s.Require().NoError(err) @@ -793,7 +869,7 @@ func (s *KeeperTestSuite) TestProcessEpochPointer() { gauges := s.App.IncentivesKeeper.GetGauges(s.Ctx) actualGauges := make(gaugeCoinsSlice, 0, len(gauges)) for _, gauge := range gauges { - actualGauges = append(actualGauges, gaugeCoins{gaugeID: gauge.Id, coins: gauge.Coins}) + actualGauges = append(actualGauges, gaugeCoins{gaugeID: gauge.Id, coins: gauge.DistributedCoins}) } // Check block results @@ -811,7 +887,13 @@ func (s *KeeperTestSuite) TestProcessEpochPointer() { s.Require().Equal(expected.epochPointers, pointers) // Verify gauges are rewarded. Equality here is important! - s.Require().Equal(expected.gauges, actualGauges, "block height: %d\nexpect: %s\nactual: %s", i, expected, actualGauges) + s.Require().Equal(expected.gauges, actualGauges, "block height: %d\nexpect: %s\nactual: %s", i, expected.gauges, actualGauges) + + // Verify lockup owner are rewarded + for _, lock := range expected.lockups { + actualBalance := s.App.BankKeeper.GetAllBalances(s.Ctx, lock.owner) + s.Require().Equal(lock.balance, actualBalance) + } // Verify streams are valid active := s.App.StreamerKeeper.GetActiveStreams(s.Ctx) @@ -874,11 +956,17 @@ func (s distributedCoinsSlice) String() string { return result } +type lockup struct { + owner sdk.AccAddress + balance sdk.Coins +} + type blockResults struct { height uint64 epochPointers []types.EpochPointer distributedCoins distributedCoinsSlice gauges gaugeCoinsSlice + lockups []lockup } func (b blockResults) String() string { diff --git a/x/streamer/keeper/distribute.go b/x/streamer/keeper/distribute.go index 698c0043a..37c25f3e7 100644 --- a/x/streamer/keeper/distribute.go +++ b/x/streamer/keeper/distribute.go @@ -6,51 +6,42 @@ import ( "cosmossdk.io/math" sdk "github.com/cosmos/cosmos-sdk/types" - "github.com/dymensionxyz/dymension/v3/internal/pagination" + incentivestypes "github.com/dymensionxyz/dymension/v3/x/incentives/types" "github.com/dymensionxyz/dymension/v3/x/streamer/types" ) -func (k Keeper) DistributeToGauge(ctx sdk.Context, coins sdk.Coins, record types.DistrRecord, totalWeight math.Int) (sdk.Coins, error) { +func (k Keeper) CalculateGaugeRewards(ctx sdk.Context, coins sdk.Coins, record types.DistrRecord, totalWeight math.Int) (sdk.Coins, error) { if coins.Empty() { - return sdk.Coins{}, fmt.Errorf("coins to allocate cannot be empty") + return nil, fmt.Errorf("coins to allocate cannot be empty") } if totalWeight.IsZero() { - return sdk.Coins{}, fmt.Errorf("distribution total weight cannot be zero") + return nil, fmt.Errorf("distribution total weight cannot be zero") } - totalAllocated := sdk.NewCoins() + weightDec := sdk.NewDecFromInt(record.Weight) + totalDec := sdk.NewDecFromInt(totalWeight) + rewards := sdk.NewCoins() + for _, coin := range coins { if coin.IsZero() { continue } assetAmountDec := sdk.NewDecFromInt(coin.Amount) - weightDec := sdk.NewDecFromInt(record.Weight) - totalDec := sdk.NewDecFromInt(totalWeight) allocatingAmount := assetAmountDec.Mul(weightDec.Quo(totalDec)).TruncateInt() // when weight is too small and no amount is allocated, just skip this to avoid zero coin send issues if !allocatingAmount.IsPositive() { - k.Logger(ctx).Info(fmt.Sprintf("allocating amount for (%d, %s) record is not positive", record.GaugeId, record.Weight.String())) + k.Logger(ctx).Info(fmt.Sprintf("allocating amount for gauge id '%d' with weight '%s' is not positive", record.GaugeId, record.Weight.String())) continue } - _, err := k.ik.GetGaugeByID(ctx, record.GaugeId) - if err != nil { - return sdk.Coins{}, fmt.Errorf("get gauge %d: %w", record.GaugeId, err) - } - allocatedCoin := sdk.Coin{Denom: coin.Denom, Amount: allocatingAmount} - err = k.ik.AddToGaugeRewards(ctx, k.ak.GetModuleAddress(types.ModuleName), sdk.NewCoins(allocatedCoin), record.GaugeId) - if err != nil { - return sdk.Coins{}, fmt.Errorf("add rewards to gauge %d: %w", record.GaugeId, err) - } - - totalAllocated = totalAllocated.Add(allocatedCoin) + rewards = rewards.Add(allocatedCoin) } - return totalAllocated, nil + return rewards, nil } // UpdateStreamAtEpochStart updates the stream for a new epoch. Streams distribute rewards post factum. @@ -95,55 +86,151 @@ func (k Keeper) UpdateStreamAtEpochStart(ctx sdk.Context, stream types.Stream) ( return stream, nil } +type streamInfo struct { + nextID int + streamIDToID map[uint64]int + IDToStream []types.Stream + totalDistr sdk.Coins +} + +func newStreamInfo(streams []types.Stream) *streamInfo { + info := &streamInfo{ + nextID: 0, + streamIDToID: make(map[uint64]int), + IDToStream: make([]types.Stream, 0), + } + for _, stream := range streams { + info.addDistrCoins(stream, sdk.Coins{}) + } + return info +} + +func (i *streamInfo) addDistrCoins(stream types.Stream, coins sdk.Coins) { + if id, ok := i.streamIDToID[stream.Id]; ok { + i.IDToStream[id].DistributedCoins = i.IDToStream[id].DistributedCoins.Add(coins...) + } else { + newID := i.nextID + i.nextID++ + i.streamIDToID[stream.Id] = newID + stream.DistributedCoins = stream.DistributedCoins.Add(coins...) + i.IDToStream = append(i.IDToStream, stream) + } + i.totalDistr = i.totalDistr.Add(coins...) +} + +func (i *streamInfo) getStreams() []types.Stream { + return i.IDToStream +} + +type gaugeInfo struct { + nextID int + gaugeIDToID map[uint64]int + IDToDistrCoins []sdk.Coins +} + +func newGaugeInfo() *gaugeInfo { + return &gaugeInfo{ + nextID: 0, + gaugeIDToID: make(map[uint64]int), + IDToDistrCoins: make([]sdk.Coins, 0), + } +} + +func (i *gaugeInfo) addDistrCoins(gaugeID uint64, coins sdk.Coins) { + if id, ok := i.gaugeIDToID[gaugeID]; ok { + i.IDToDistrCoins[id] = i.IDToDistrCoins[id].Add(coins...) + } else { + newID := i.nextID + i.nextID++ + i.gaugeIDToID[gaugeID] = newID + i.IDToDistrCoins = append(i.IDToDistrCoins, coins) + } +} + +type gaugeCoins struct { + gaugeID uint64 + coins sdk.Coins +} + +func (i *gaugeInfo) getGaugeCoins() []gaugeCoins { + result := make([]gaugeCoins, 0, len(i.gaugeIDToID)) + for gaugeID, id := range i.gaugeIDToID { + result = append(result, gaugeCoins{ + gaugeID: gaugeID, + coins: i.IDToDistrCoins[id], + }) + } + return result +} + +// filterGauges filters gauges cache and applies gauge balance updates. Skips gauges on errors. +func (k Keeper) filterGauges(ctx sdk.Context, gaugeInfo *gaugeInfo) []incentivestypes.Gauge { + filtered := make([]incentivestypes.Gauge, 0) + for _, gc := range gaugeInfo.getGaugeCoins() { + gauge, err := k.ik.GetGaugeByID(ctx, gc.gaugeID) + if err != nil { + // we don't want to fail in this case + k.Logger(ctx). + With("gaugeID", gc.gaugeID, "error", err.Error()). + Error("Can't distribute to gauge: failed to get gauge") + continue + } + + finished := gauge.IsFinishedGauge(ctx.BlockTime()) + if finished { + // we don't want to fail in this case + k.Logger(ctx). + With("gaugeID", gc.gaugeID). + Error("Can't distribute to gauge: gauge is finished") + continue + } + + // Update gauge balance + gauge.Coins = gauge.Coins.Add(gc.coins...) + filtered = append(filtered, *gauge) + } + return filtered +} + type DistributeRewardsResult struct { - NewPointer types.EpochPointer - FilledStreams []types.Stream - DistributedCoins sdk.Coins - Iterations uint64 + NewPointer types.EpochPointer + Iterations uint64 } -// DistributeRewards distributes all streams rewards to the corresponding gauges starting with -// the specified pointer and considering the limit. -func (k Keeper) DistributeRewards( +// CalculateRewards calculates rewards for streams and corresponding gauges. Is starts processing gauges from +// the specified pointer and considering the limit. This method doesn't have any state updates and validations +// (for example, if the gauge exists or is unfinished), it only calculates rewards and fills respective caches. +func (k Keeper) CalculateRewards( ctx sdk.Context, pointer types.EpochPointer, limit uint64, - streams []types.Stream, + streamInfo *streamInfo, + gaugeInfo *gaugeInfo, ) DistributeRewardsResult { - totalDistributed := sdk.NewCoins() - - // Temporary map for convenient calculations - streamUpdates := make(map[uint64]sdk.Coins, len(streams)) - - // Distribute to all the remaining gauges that are left after EndBlock - newPointer, iterations := IterateEpochPointer(pointer, streams, limit, func(v StreamGauge) pagination.Stop { - distributed, errX := k.DistributeToGauge(ctx, v.Stream.EpochCoins, v.Gauge, v.Stream.DistributeTo.TotalWeight) - if errX != nil { + newPointer, iterations := IterateEpochPointer(pointer, streamInfo.getStreams(), limit, func(v StreamGauge) (stop bool, weight uint64) { + added, err := k.CalculateGaugeRewards( + ctx, + v.Stream.EpochCoins, + v.Gauge, + v.Stream.DistributeTo.TotalWeight, + ) + if err != nil { // Ignore this gauge k.Logger(ctx). - With("streamID", v.Stream.Id, "gaugeID", v.Gauge.GaugeId, "error", errX.Error()). + With("streamID", v.Stream.Id, "gaugeID", v.Gauge.GaugeId, "error", err.Error()). Error("Failed to distribute to gauge") + return false, 0 // weight = 0, consider this operation as it is free } - totalDistributed = totalDistributed.Add(distributed...) - // Update distributed coins for the stream - update := streamUpdates[v.Stream.Id] - update = update.Add(distributed...) - streamUpdates[v.Stream.Id] = update + streamInfo.addDistrCoins(v.Stream, added) + gaugeInfo.addDistrCoins(v.Gauge.GaugeId, added) - return pagination.Continue + return false, 1 }) - for i, s := range streams { - s.DistributedCoins = s.DistributedCoins.Add(streamUpdates[s.Id]...) - streams[i] = s - } - return DistributeRewardsResult{ - NewPointer: newPointer, - FilledStreams: streams, // Make sure that the returning slice is always sorted - DistributedCoins: totalDistributed, - Iterations: iterations, + NewPointer: newPointer, + Iterations: iterations, } } diff --git a/x/streamer/keeper/distribute_test.go b/x/streamer/keeper/distribute_test.go index 38b847750..2d0d8f73e 100644 --- a/x/streamer/keeper/distribute_test.go +++ b/x/streamer/keeper/distribute_test.go @@ -234,6 +234,10 @@ func (suite *KeeperTestSuite) TestSponsoredDistribute() { suite.Run(tc.name, func() { suite.SetupTest() + // We must create at least one lock, otherwise distribution won't work + lockOwner := apptesting.CreateRandomAccounts(1)[0] + suite.LockTokens(lockOwner, sdk.NewCoins(sdk.NewInt64Coin("stake", 100))) + // Cast an initial vote if tc.hasInitialDistr { suite.Vote(tc.initialVote, math.NewInt(1_000_000)) diff --git a/x/streamer/keeper/hooks.go b/x/streamer/keeper/hooks.go index 437f39401..2eeabca8f 100644 --- a/x/streamer/keeper/hooks.go +++ b/x/streamer/keeper/hooks.go @@ -8,6 +8,7 @@ import ( gammtypes "github.com/osmosis-labs/osmosis/v15/x/gamm/types" ctypes "github.com/dymensionxyz/dymension/v3/x/common/types" + incentivestypes "github.com/dymensionxyz/dymension/v3/x/incentives/types" rollapptypes "github.com/dymensionxyz/dymension/v3/x/rollapp/types" "github.com/dymensionxyz/dymension/v3/x/streamer/types" @@ -93,10 +94,28 @@ func (k Keeper) AfterEpochEnd(ctx sdk.Context, epochIdentifier string) (sdk.Coin return sdk.Coins{}, fmt.Errorf("get epoch pointer for epoch '%s': %w", epochIdentifier, err) } - distrResult := k.DistributeRewards(ctx, epochPointer, types.IterationsNoLimit, toDistribute) + streamCache := newStreamInfo(toDistribute) + gaugeCache := newGaugeInfo() + + distrResult := k.CalculateRewards(ctx, epochPointer, types.IterationsNoLimit, streamCache, gaugeCache) + + // Filter gauges to distribute + toDistributeGauges := k.filterGauges(ctx, gaugeCache) + + // Send coins to distribute to the x/incentives module + err = k.bk.SendCoinsFromModuleToModule(ctx, types.ModuleName, incentivestypes.ModuleName, streamCache.totalDistr) + if err != nil { + return nil, fmt.Errorf("send coins: %w", err) + } + + // Distribute the rewards + _, err = k.ik.Distribute(ctx, toDistributeGauges) + if err != nil { + return nil, fmt.Errorf("distribute: %w", err) + } // Update streams with respect to a new epoch and save them - for _, s := range distrResult.FilledStreams { + for _, s := range streamCache.getStreams() { // Save the stream err = k.SetStream(ctx, &s) if err != nil { @@ -113,15 +132,15 @@ func (k Keeper) AfterEpochEnd(ctx sdk.Context, epochIdentifier string) (sdk.Coin err = ctx.EventManager().EmitTypedEvent(&types.EventEpochEnd{ Iterations: distrResult.Iterations, - Distributed: distrResult.DistributedCoins, + Distributed: streamCache.totalDistr, }) if err != nil { return sdk.Coins{}, fmt.Errorf("emit typed event: %w", err) } - ctx.Logger().Info("Streamer distributed coins", "amount", distrResult.DistributedCoins.String()) + ctx.Logger().Info("Streamer distributed coins", "amount", streamCache.totalDistr.String()) - return distrResult.DistributedCoins, nil + return streamCache.totalDistr, nil } // BeforeEpochStart is the epoch start hook. diff --git a/x/streamer/keeper/hooks_test.go b/x/streamer/keeper/hooks_test.go index 6a78957d4..149329926 100644 --- a/x/streamer/keeper/hooks_test.go +++ b/x/streamer/keeper/hooks_test.go @@ -6,6 +6,7 @@ import ( "cosmossdk.io/math" "github.com/stretchr/testify/suite" + "github.com/dymensionxyz/dymension/v3/app/apptesting" "github.com/dymensionxyz/dymension/v3/x/streamer/types" sdk "github.com/cosmos/cosmos-sdk/types" @@ -24,6 +25,10 @@ func (suite *KeeperTestSuite) TestHookOperation() { err := suite.CreateGauge() suite.Require().NoError(err) + // we must create at least one lock, otherwise distribution won't work + lockOwner := apptesting.CreateRandomAccounts(1)[0] + suite.LockTokens(lockOwner, sdk.NewCoins(sdk.NewInt64Coin("stake", 100))) + // initial module streams check streams := suite.App.StreamerKeeper.GetNotFinishedStreams(suite.Ctx) suite.Require().Len(streams, 0) diff --git a/x/streamer/keeper/keeper.go b/x/streamer/keeper/keeper.go index faf4978d3..e2e771ba1 100644 --- a/x/streamer/keeper/keeper.go +++ b/x/streamer/keeper/keeper.go @@ -28,6 +28,7 @@ type Keeper struct { ak types.AccountKeeper ik types.IncentivesKeeper sk types.SponsorshipKeeper + lk types.LockupKeeper // epochPointers holds a mapping from the epoch identifier to EpochPointer. epochPointers collections.Map[string, types.EpochPointer] diff --git a/x/streamer/keeper/stream_iterator.go b/x/streamer/keeper/stream_iterator.go index df8d3f422..828b6f97a 100644 --- a/x/streamer/keeper/stream_iterator.go +++ b/x/streamer/keeper/stream_iterator.go @@ -11,7 +11,7 @@ func IterateEpochPointer( p types.EpochPointer, streams []types.Stream, maxIterations uint64, - cb func(v StreamGauge) pagination.Stop, + cb func(v StreamGauge) (stop bool, weight uint64), ) (types.EpochPointer, uint64) { iter := NewStreamIterator(streams, p.StreamId, p.GaugeId, p.EpochIdentifier) iterations := pagination.Paginate(iter, maxIterations, cb) diff --git a/x/streamer/keeper/stream_iterator_test.go b/x/streamer/keeper/stream_iterator_test.go index 9d802203e..c45e66fb9 100644 --- a/x/streamer/keeper/stream_iterator_test.go +++ b/x/streamer/keeper/stream_iterator_test.go @@ -5,7 +5,6 @@ import ( "github.com/stretchr/testify/require" - "github.com/dymensionxyz/dymension/v3/internal/pagination" "github.com/dymensionxyz/dymension/v3/x/streamer/keeper" "github.com/dymensionxyz/dymension/v3/x/streamer/types" ) @@ -1292,9 +1291,9 @@ func TestStreamIterator(t *testing.T) { t.Parallel() var traversal [][2]uint64 - newPointer, iters := keeper.IterateEpochPointer(tc.pointer, tc.streams, tc.maxIters, func(v keeper.StreamGauge) pagination.Stop { + newPointer, iters := keeper.IterateEpochPointer(tc.pointer, tc.streams, tc.maxIters, func(v keeper.StreamGauge) (stop bool, weight uint64) { traversal = append(traversal, [2]uint64{v.Stream.Id, v.Gauge.GaugeId}) - return pagination.Continue + return false, 1 }) require.Equal(t, tc.expectedIters, iters) diff --git a/x/streamer/keeper/suite_test.go b/x/streamer/keeper/suite_test.go index 0ee013d8f..426c8ddaf 100644 --- a/x/streamer/keeper/suite_test.go +++ b/x/streamer/keeper/suite_test.go @@ -1,7 +1,6 @@ package keeper_test import ( - "slices" "testing" "time" @@ -67,7 +66,7 @@ func (suite *KeeperTestSuite) CreateGauge() error { suite.App.AccountKeeper.GetModuleAddress(types.ModuleName), sdk.Coins{}, lockuptypes.QueryCondition{ - LockQueryType: lockuptypes.ByTime, + LockQueryType: lockuptypes.ByDuration, Denom: "stake", Duration: time.Hour, Timestamp: time.Time{}, @@ -228,20 +227,36 @@ func (suite *KeeperTestSuite) Delegate(delAddr sdk.AccAddress, valAddr sdk.ValAd } func (suite *KeeperTestSuite) DistributeAllRewards(streams []types.Stream) sdk.Coins { - rewards := sdk.Coins{} - suite.Require().True(slices.IsSortedFunc(streams, keeper.CmpStreams)) - for _, stream := range streams { - epoch := suite.App.EpochsKeeper.GetEpochInfo(suite.Ctx, stream.DistrEpochIdentifier) - res := suite.App.StreamerKeeper.DistributeRewards( - suite.Ctx, - types.NewEpochPointer(epoch.Identifier, epoch.Duration), - types.IterationsNoLimit, - []types.Stream{stream}, - ) - suite.Require().Len(res.FilledStreams, 1) - err := suite.App.StreamerKeeper.SetStream(suite.Ctx, &res.FilledStreams[0]) - suite.Require().NoError(err) - rewards = rewards.Add(res.DistributedCoins...) - } - return rewards + // We must create at least one lock, otherwise distribution won't work + lockOwner := apptesting.CreateRandomAccounts(1)[0] + suite.LockTokens(lockOwner, sdk.NewCoins(sdk.NewInt64Coin("stake", 100))) + + err := suite.App.StreamerKeeper.BeforeEpochStart(suite.Ctx, "day") + suite.Require().NoError(err) + coins, err := suite.App.StreamerKeeper.AfterEpochEnd(suite.Ctx, "day") + suite.Require().NoError(err) + //rewards := sdk.Coins{} + //suite.Require().True(slices.IsSortedFunc(streams, keeper.CmpStreams)) + //for _, stream := range streams { + // epoch := suite.App.EpochsKeeper.GetEpochInfo(suite.Ctx, stream.DistrEpochIdentifier) + // res := suite.App.StreamerKeeper.CalculateRewards( + // suite.Ctx, + // types.NewEpochPointer(epoch.Identifier, epoch.Duration), + // types.IterationsNoLimit, + // []types.Stream{stream}, + // ) + // suite.Require().Len(res.FilledStreams, 1) + // err := suite.App.StreamerKeeper.SetStream(suite.Ctx, &res.FilledStreams[0]) + // suite.Require().NoError(err) + // rewards = rewards.Add(res.DistributedCoins...) + //} + //return rewards + return coins +} + +// LockTokens locks tokens for the specified duration +func (suite *KeeperTestSuite) LockTokens(addr sdk.AccAddress, coins sdk.Coins) { + suite.FundAcc(addr, coins) + _, err := suite.App.LockupKeeper.CreateLock(suite.Ctx, addr, coins, time.Hour) + suite.Require().NoError(err) } diff --git a/x/streamer/types/expected_keepers.go b/x/streamer/types/expected_keepers.go index f2326419a..abefb4597 100644 --- a/x/streamer/types/expected_keepers.go +++ b/x/streamer/types/expected_keepers.go @@ -16,6 +16,7 @@ import ( type BankKeeper interface { GetBalance(ctx sdk.Context, addr sdk.AccAddress, denom string) sdk.Coin GetAllBalances(ctx sdk.Context, addr sdk.AccAddress) sdk.Coins + SendCoinsFromModuleToModule(ctx sdk.Context, senderModule, recipientModule string, amt sdk.Coins) error } // EpochKeeper defines the expected interface needed to retrieve epoch info. @@ -38,6 +39,12 @@ type IncentivesKeeper interface { GetGaugeByID(ctx sdk.Context, gaugeID uint64) (*incentivestypes.Gauge, error) AddToGaugeRewards(ctx sdk.Context, owner sdk.AccAddress, coins sdk.Coins, gaugeID uint64) error + + Distribute(ctx sdk.Context, gauges []incentivestypes.Gauge) (sdk.Coins, error) +} + +type LockupKeeper interface { + GetDenomLockNum(ctx sdk.Context, denom string) (uint64, error) } type SponsorshipKeeper interface {