Skip to content

Commit

Permalink
feat(streamer): distribute rewards immediately in the current block (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
keruch authored Sep 20, 2024
1 parent 5a3e0a8 commit 14877e9
Show file tree
Hide file tree
Showing 43 changed files with 2,146 additions and 606 deletions.
2 changes: 1 addition & 1 deletion app/keepers/keepers.go
Original file line number Diff line number Diff line change
Expand Up @@ -565,8 +565,8 @@ func (a *AppKeepers) SetupHooks() {
a.EpochsKeeper.SetHooks(
epochstypes.NewMultiEpochHooks(
// insert epochs hooks receivers here
a.StreamerKeeper.Hooks(), // x/streamer must be before x/incentives
a.IncentivesKeeper.Hooks(),
a.StreamerKeeper.Hooks(),
a.TxFeesKeeper.Hooks(),
a.DelayedAckKeeper.GetEpochHooks(),
a.DymNSKeeper.GetEpochHooks(),
Expand Down
2 changes: 1 addition & 1 deletion proto/dymensionxyz/dymension/streamer/stream.proto
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ message Stream {
uint64 id = 1;

// distribute_to is the distr_info.
DistrInfo distribute_to = 2;
DistrInfo distribute_to = 2 [ (gogoproto.nullable) = false ];

// coins is the total amount of coins that have been in the stream
// Can distribute multiple coin denoms
Expand Down
93 changes: 93 additions & 0 deletions utils/cache/ordered.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package cache

import "fmt"

// InsertionOrdered is a cache that preserves the insertion order of its elements.
// It maps keys of type K to values of type V and ensures that elements are iterated
// in the order they were inserted. This implementation is NOT thread-safe.
type InsertionOrdered[K comparable, V any] struct {
key func(V) K // A function that generates the key from a value
nextIdx int // The index to assign to the next inserted element
keyToIdx map[K]int // Maps keys to their index in the idxToValue slice
idxToValue []V // Stores values in the order they were inserted
}

// NewInsertionOrdered creates and returns a new InsertionOrdered cache.
// It accepts a key function, which extracts a key of type K from a value of type V.
// Optionally, you can pass initial values to be inserted into the cache.
func NewInsertionOrdered[K comparable, V any](key func(V) K, initial ...V) *InsertionOrdered[K, V] {
cache := &InsertionOrdered[K, V]{
key: key,
nextIdx: 0,
keyToIdx: make(map[K]int, len(initial)),
idxToValue: make([]V, 0, len(initial)),
}
// Insert the initial values (if any) into the cache
cache.Upsert(initial...)
return cache
}

// Upsert inserts or updates one or more values in the cache.
// If a value with the same key already exists, it will be updated.
// If the key is new, the value will be appended while preserving the insertion order.
func (c *InsertionOrdered[K, V]) Upsert(values ...V) {
for _, value := range values {
c.upsert(value)
}
}

// upsert is an internal helper method that inserts or updates a single value.
// It extracts the key from the value, checks if it already exists in the cache,
// and updates the value if found. If not, it appends the new value to the cache.
func (c *InsertionOrdered[K, V]) upsert(value V) {
key := c.key(value)
idx, ok := c.keyToIdx[key]
if ok {
// If the key already exists, update the value
c.idxToValue[idx] = value
} else {
// If the key does not exist, add a new entry
idx = c.nextIdx
c.nextIdx++
c.keyToIdx[key] = idx
c.idxToValue = append(c.idxToValue, value)
}
}

// Get retrieves a value from the cache by its key.
// It returns the value and a boolean indicating whether the key was found.
// If the key does not exist, it returns the zero value of type V and false.
func (c *InsertionOrdered[K, V]) Get(key K) (zero V, found bool) {
idx, ok := c.keyToIdx[key]
if ok {
return c.idxToValue[idx], true
}
return zero, false
}

// MustGet is Get that panics when the key is not found.
func (c *InsertionOrdered[K, V]) MustGet(key K) V {
value, ok := c.Get(key)
if ok {
return value
}
panic(fmt.Errorf("internal contract error: key is not found in the cache: %v", key))
}

// GetAll returns all values currently stored in the cache in their insertion order.
// This allows you to retrieve all values while preserving the order in which they were added.
func (c *InsertionOrdered[K, V]) GetAll() []V {
return c.idxToValue
}

// Range iterates over the values in the cache in their insertion order.
// The provided function f is called for each value. If f returns true, the iteration stops early.
// This method allows for efficient traversal without needing to copy the entire cache.
func (c *InsertionOrdered[K, V]) Range(f func(V) bool) {
for _, value := range c.idxToValue {
stop := f(value)
if stop {
return
}
}
}
163 changes: 163 additions & 0 deletions utils/cache/ordered_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
package cache_test

import (
"testing"

"github.com/stretchr/testify/require"

"github.com/dymensionxyz/dymension/v3/utils/cache"
)

// Test struct to use in the cache
type testStruct struct {
ID int
Name string
}

// Key extraction function for testStruct
func keyFunc(val testStruct) int {
return val.ID
}

func TestInsertionOrdered_Upsert(t *testing.T) {
testCases := []struct {
name string
initial []testStruct
upserts []testStruct
expectedValues []testStruct
}{
{
name: "Insert single item",
initial: []testStruct{},
upserts: []testStruct{{ID: 1, Name: "Item 1"}},
expectedValues: []testStruct{{ID: 1, Name: "Item 1"}},
},
{
name: "Insert multiple items",
initial: []testStruct{},
upserts: []testStruct{{ID: 1, Name: "Item 1"}, {ID: 2, Name: "Item 2"}},
expectedValues: []testStruct{{ID: 1, Name: "Item 1"}, {ID: 2, Name: "Item 2"}},
},
{
name: "Update existing item",
initial: []testStruct{{ID: 1, Name: "Item 1"}},
upserts: []testStruct{{ID: 1, Name: "Updated Item 1"}},
expectedValues: []testStruct{{ID: 1, Name: "Updated Item 1"}},
},
{
name: "Insert and update items",
initial: []testStruct{{ID: 1, Name: "Item 1"}},
upserts: []testStruct{{ID: 2, Name: "Item 2"}, {ID: 1, Name: "Updated Item 1"}, {ID: 0, Name: "Item 0"}},
expectedValues: []testStruct{{ID: 1, Name: "Updated Item 1"}, {ID: 2, Name: "Item 2"}, {ID: 0, Name: "Item 0"}},
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
c := cache.NewInsertionOrdered(keyFunc, tc.initial...)
c.Upsert(tc.upserts...)

// Validate that the cache contains the expected values in the correct order
require.Equal(t, tc.expectedValues, c.GetAll())
})
}
}

func TestInsertionOrdered_Get(t *testing.T) {
testCases := []struct {
name string
initial []testStruct
getID int
expectedVal testStruct
found bool
}{
{
name: "Get existing item",
initial: []testStruct{{ID: 1, Name: "Item 1"}},
getID: 1,
expectedVal: testStruct{ID: 1, Name: "Item 1"},
found: true,
},
{
name: "Get non-existing item",
initial: []testStruct{{ID: 1, Name: "Item 1"}},
getID: 2,
expectedVal: testStruct{},
found: false,
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
c := cache.NewInsertionOrdered(keyFunc, tc.initial...)
val, found := c.Get(tc.getID)

require.Equal(t, tc.found, found)
require.Equal(t, tc.expectedVal, val)
})
}
}

func TestInsertionOrdered_GetAll(t *testing.T) {
testCases := []struct {
name string
initial []testStruct
expectedValues []testStruct
}{
{
name: "Get all from empty cache",
initial: []testStruct{},
expectedValues: []testStruct{},
},
{
name: "Get all from non-empty cache",
initial: []testStruct{{ID: 1, Name: "Item 1"}, {ID: 2, Name: "Item 2"}},
expectedValues: []testStruct{{ID: 1, Name: "Item 1"}, {ID: 2, Name: "Item 2"}},
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
c := cache.NewInsertionOrdered(keyFunc, tc.initial...)
allValues := c.GetAll()

require.Equal(t, tc.expectedValues, allValues)
})
}
}

func TestInsertionOrdered_Range(t *testing.T) {
testCases := []struct {
name string
initial []testStruct
stopID int
expectedValues []testStruct
}{
{
name: "Range over all values",
initial: []testStruct{{ID: 1, Name: "Item 1"}, {ID: 2, Name: "Item 2"}},
stopID: -1,
expectedValues: []testStruct{{ID: 1, Name: "Item 1"}, {ID: 2, Name: "Item 2"}},
},
{
name: "Stop at specific value",
initial: []testStruct{{ID: 1, Name: "Item 1"}, {ID: 2, Name: "Item 2"}},
stopID: 1,
expectedValues: []testStruct{{ID: 1, Name: "Item 1"}},
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
c := cache.NewInsertionOrdered(keyFunc, tc.initial...)
var collected []testStruct

c.Range(func(v testStruct) bool {
collected = append(collected, v)
return v.ID == tc.stopID
})

require.Equal(t, tc.expectedValues, collected)
})
}
}
35 changes: 17 additions & 18 deletions utils/pagination/paginate.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,24 +6,23 @@ type Iterator[T any] interface {
Valid() bool
}

type Stop bool

const (
Break Stop = true
Continue Stop = false
)

// Paginate is a function that paginates over an iterator. The callback is executed for each iteration and if it
// returns true, the pagination stops. The function returns the amount of iterations before stopping.
func Paginate[T any](iter Iterator[T], perPage uint64, cb func(T) Stop) uint64 {
iterations := uint64(0)
for ; iterations < perPage && iter.Valid(); iter.Next() {
iterations++

stop := cb(iter.Value())
if stop {
break
}
// returns true, the pagination stops. The callback also returns the number of operations performed during the call.
// That is, one iteration may be complex and thus return >1 operation num. For example, in case if the called decides
// that the iteration is heavy or time-consuming. Paginate also allows to specify the maximum number of operations
// that may be accumulated during the execution. If this number is exceeded, then Paginate exits.
// The function returns the total number of operations performed before stopping.
func Paginate[T any](
iter Iterator[T],
maxOperations uint64,
cb func(T) (stop bool, operations uint64),
) uint64 {
totalOperations := uint64(0)
stop := false
for ; !stop && totalOperations < maxOperations && iter.Valid(); iter.Next() {
var operations uint64
stop, operations = cb(iter.Value())
totalOperations += operations
}
return iterations
return totalOperations
}
Loading

0 comments on commit 14877e9

Please sign in to comment.