Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(streamer): distribute rewards immediately in the current block #1173

Merged
merged 31 commits into from
Sep 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
3850352
fix(streamer): added streamer pagination
keruch Aug 13, 2024
1a5cd55
feat(streamer): added stream iterator
keruch Aug 14, 2024
7a1efa7
feat(streamer): EndBlocker
keruch Aug 15, 2024
b6b07f2
feat(streamer): AfterEpochEnd + tests
keruch Aug 16, 2024
878e9bc
feat(streamer): fixed module tests
keruch Aug 17, 2024
7a93f46
feat(streamer): EndBlocker tests
keruch Aug 19, 2024
d382d7b
feat(streamer): EndBlocker tests 2
keruch Aug 19, 2024
3217092
feat(streamer): EndBlocker tests 3
keruch Aug 19, 2024
cab303b
rebase + wip: threads
keruch Aug 29, 2024
43eaa69
upgate hanlder + threads 2
keruch Aug 30, 2024
7d748af
linter
keruch Aug 30, 2024
a1e983b
threads 3
keruch Sep 3, 2024
54f08aa
Merge branch 'main' into kirill/1010-streamer-pagination
keruch Sep 3, 2024
2b4b107
Merge branch 'main' into kirill/1010-streamer-pagination
keruch Sep 9, 2024
0e287bf
threads
keruch Sep 9, 2024
3a18dc0
upgate hanlder + threads 2
keruch Aug 30, 2024
b8d542d
feat(streamer): distribute rewards immediately in the current block
keruch Sep 2, 2024
bf971ca
updated x/incentives
keruch Sep 3, 2024
209b862
added unit tests
keruch Sep 3, 2024
1541ec3
docstrings
keruch Sep 3, 2024
ef3ddd8
threads
keruch Sep 9, 2024
55ef675
threads 2
keruch Sep 9, 2024
24f60ef
threads 2
keruch Sep 10, 2024
a4baf0b
Merge branch 'main' into kirill/354-streamer-distributes-immediately
keruch Sep 12, 2024
d60d581
merge commit
keruch Sep 12, 2024
4042cf3
small fix
keruch Sep 12, 2024
ffb7351
Merge branch 'main' into kirill/354-streamer-distributes-immediately
keruch Sep 16, 2024
90d79e3
merge commit
keruch Sep 16, 2024
34efd4b
threads 3
keruch Sep 17, 2024
0f4d284
error strings
keruch Sep 17, 2024
897e8ed
threads
keruch Sep 19, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion app/keepers/keepers.go
Original file line number Diff line number Diff line change
Expand Up @@ -565,8 +565,8 @@ func (a *AppKeepers) SetupHooks() {
a.EpochsKeeper.SetHooks(
epochstypes.NewMultiEpochHooks(
// insert epochs hooks receivers here
a.StreamerKeeper.Hooks(), // x/streamer must be before x/incentives
a.IncentivesKeeper.Hooks(),
a.StreamerKeeper.Hooks(),
a.TxFeesKeeper.Hooks(),
a.DelayedAckKeeper.GetEpochHooks(),
a.DymNSKeeper.GetEpochHooks(),
Expand Down
2 changes: 1 addition & 1 deletion proto/dymensionxyz/dymension/streamer/stream.proto
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ message Stream {
uint64 id = 1;

// distribute_to is the distr_info.
DistrInfo distribute_to = 2;
DistrInfo distribute_to = 2 [ (gogoproto.nullable) = false ];

// coins is the total amount of coins that have been in the stream
// Can distribute multiple coin denoms
Expand Down
93 changes: 93 additions & 0 deletions utils/cache/ordered.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package cache

import "fmt"

// InsertionOrdered is a cache that preserves the insertion order of its elements.
// It maps keys of type K to values of type V and ensures that elements are iterated
// in the order they were inserted. This implementation is NOT thread-safe.
type InsertionOrdered[K comparable, V any] struct {
zale144 marked this conversation as resolved.
Show resolved Hide resolved
key func(V) K // A function that generates the key from a value
nextIdx int // The index to assign to the next inserted element
keyToIdx map[K]int // Maps keys to their index in the idxToValue slice
idxToValue []V // Stores values in the order they were inserted
}

// NewInsertionOrdered creates and returns a new InsertionOrdered cache.
// It accepts a key function, which extracts a key of type K from a value of type V.
// Optionally, you can pass initial values to be inserted into the cache.
func NewInsertionOrdered[K comparable, V any](key func(V) K, initial ...V) *InsertionOrdered[K, V] {
cache := &InsertionOrdered[K, V]{
key: key,
nextIdx: 0,
keyToIdx: make(map[K]int, len(initial)),
idxToValue: make([]V, 0, len(initial)),
}
// Insert the initial values (if any) into the cache
cache.Upsert(initial...)
return cache
}

// Upsert inserts or updates one or more values in the cache.
// If a value with the same key already exists, it will be updated.
// If the key is new, the value will be appended while preserving the insertion order.
func (c *InsertionOrdered[K, V]) Upsert(values ...V) {
for _, value := range values {
c.upsert(value)
}
}

// upsert is an internal helper method that inserts or updates a single value.
// It extracts the key from the value, checks if it already exists in the cache,
// and updates the value if found. If not, it appends the new value to the cache.
func (c *InsertionOrdered[K, V]) upsert(value V) {
key := c.key(value)
idx, ok := c.keyToIdx[key]
if ok {
// If the key already exists, update the value
c.idxToValue[idx] = value
} else {
// If the key does not exist, add a new entry
idx = c.nextIdx
c.nextIdx++
c.keyToIdx[key] = idx
c.idxToValue = append(c.idxToValue, value)
}
}

// Get retrieves a value from the cache by its key.
// It returns the value and a boolean indicating whether the key was found.
// If the key does not exist, it returns the zero value of type V and false.
func (c *InsertionOrdered[K, V]) Get(key K) (zero V, found bool) {
idx, ok := c.keyToIdx[key]
if ok {
return c.idxToValue[idx], true
}
return zero, false
}

// MustGet is Get that panics when the key is not found.
func (c *InsertionOrdered[K, V]) MustGet(key K) V {
value, ok := c.Get(key)
if ok {
return value
}
panic(fmt.Errorf("internal contract error: key is not found in the cache: %v", key))
}

// GetAll returns all values currently stored in the cache in their insertion order.
// This allows you to retrieve all values while preserving the order in which they were added.
func (c *InsertionOrdered[K, V]) GetAll() []V {
return c.idxToValue
}

// Range iterates over the values in the cache in their insertion order.
// The provided function f is called for each value. If f returns true, the iteration stops early.
// This method allows for efficient traversal without needing to copy the entire cache.
func (c *InsertionOrdered[K, V]) Range(f func(V) bool) {
for _, value := range c.idxToValue {
stop := f(value)
if stop {
return
}
}
}
163 changes: 163 additions & 0 deletions utils/cache/ordered_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
package cache_test

import (
"testing"

"github.com/stretchr/testify/require"

"github.com/dymensionxyz/dymension/v3/utils/cache"
)

// Test struct to use in the cache
type testStruct struct {
ID int
Name string
}

// Key extraction function for testStruct
func keyFunc(val testStruct) int {
return val.ID
}

func TestInsertionOrdered_Upsert(t *testing.T) {
testCases := []struct {
name string
initial []testStruct
upserts []testStruct
expectedValues []testStruct
}{
{
name: "Insert single item",
initial: []testStruct{},
upserts: []testStruct{{ID: 1, Name: "Item 1"}},
expectedValues: []testStruct{{ID: 1, Name: "Item 1"}},
},
{
name: "Insert multiple items",
initial: []testStruct{},
upserts: []testStruct{{ID: 1, Name: "Item 1"}, {ID: 2, Name: "Item 2"}},
expectedValues: []testStruct{{ID: 1, Name: "Item 1"}, {ID: 2, Name: "Item 2"}},
},
{
name: "Update existing item",
initial: []testStruct{{ID: 1, Name: "Item 1"}},
upserts: []testStruct{{ID: 1, Name: "Updated Item 1"}},
expectedValues: []testStruct{{ID: 1, Name: "Updated Item 1"}},
},
{
name: "Insert and update items",
initial: []testStruct{{ID: 1, Name: "Item 1"}},
upserts: []testStruct{{ID: 2, Name: "Item 2"}, {ID: 1, Name: "Updated Item 1"}, {ID: 0, Name: "Item 0"}},
expectedValues: []testStruct{{ID: 1, Name: "Updated Item 1"}, {ID: 2, Name: "Item 2"}, {ID: 0, Name: "Item 0"}},
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
c := cache.NewInsertionOrdered(keyFunc, tc.initial...)
c.Upsert(tc.upserts...)

// Validate that the cache contains the expected values in the correct order
require.Equal(t, tc.expectedValues, c.GetAll())
})
}
}

func TestInsertionOrdered_Get(t *testing.T) {
testCases := []struct {
name string
initial []testStruct
getID int
expectedVal testStruct
found bool
}{
{
name: "Get existing item",
initial: []testStruct{{ID: 1, Name: "Item 1"}},
getID: 1,
expectedVal: testStruct{ID: 1, Name: "Item 1"},
found: true,
},
{
name: "Get non-existing item",
initial: []testStruct{{ID: 1, Name: "Item 1"}},
getID: 2,
expectedVal: testStruct{},
found: false,
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
c := cache.NewInsertionOrdered(keyFunc, tc.initial...)
val, found := c.Get(tc.getID)

require.Equal(t, tc.found, found)
require.Equal(t, tc.expectedVal, val)
})
}
}

func TestInsertionOrdered_GetAll(t *testing.T) {
testCases := []struct {
name string
initial []testStruct
expectedValues []testStruct
}{
{
name: "Get all from empty cache",
initial: []testStruct{},
expectedValues: []testStruct{},
},
{
name: "Get all from non-empty cache",
initial: []testStruct{{ID: 1, Name: "Item 1"}, {ID: 2, Name: "Item 2"}},
expectedValues: []testStruct{{ID: 1, Name: "Item 1"}, {ID: 2, Name: "Item 2"}},
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
c := cache.NewInsertionOrdered(keyFunc, tc.initial...)
allValues := c.GetAll()

require.Equal(t, tc.expectedValues, allValues)
})
}
}

func TestInsertionOrdered_Range(t *testing.T) {
testCases := []struct {
name string
initial []testStruct
stopID int
expectedValues []testStruct
}{
{
name: "Range over all values",
initial: []testStruct{{ID: 1, Name: "Item 1"}, {ID: 2, Name: "Item 2"}},
stopID: -1,
expectedValues: []testStruct{{ID: 1, Name: "Item 1"}, {ID: 2, Name: "Item 2"}},
},
{
name: "Stop at specific value",
initial: []testStruct{{ID: 1, Name: "Item 1"}, {ID: 2, Name: "Item 2"}},
stopID: 1,
expectedValues: []testStruct{{ID: 1, Name: "Item 1"}},
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
c := cache.NewInsertionOrdered(keyFunc, tc.initial...)
var collected []testStruct

c.Range(func(v testStruct) bool {
collected = append(collected, v)
return v.ID == tc.stopID
})

require.Equal(t, tc.expectedValues, collected)
})
}
}
35 changes: 17 additions & 18 deletions utils/pagination/paginate.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,24 +6,23 @@ type Iterator[T any] interface {
Valid() bool
}

type Stop bool

const (
Break Stop = true
Continue Stop = false
)

// Paginate is a function that paginates over an iterator. The callback is executed for each iteration and if it
// returns true, the pagination stops. The function returns the amount of iterations before stopping.
func Paginate[T any](iter Iterator[T], perPage uint64, cb func(T) Stop) uint64 {
iterations := uint64(0)
for ; iterations < perPage && iter.Valid(); iter.Next() {
iterations++

stop := cb(iter.Value())
if stop {
break
}
// returns true, the pagination stops. The callback also returns the number of operations performed during the call.
// That is, one iteration may be complex and thus return >1 operation num. For example, in case if the called decides
// that the iteration is heavy or time-consuming. Paginate also allows to specify the maximum number of operations
// that may be accumulated during the execution. If this number is exceeded, then Paginate exits.
// The function returns the total number of operations performed before stopping.
func Paginate[T any](
iter Iterator[T],
maxOperations uint64,
cb func(T) (stop bool, operations uint64),
) uint64 {
totalOperations := uint64(0)
stop := false
for ; !stop && totalOperations < maxOperations && iter.Valid(); iter.Next() {
var operations uint64
stop, operations = cb(iter.Value())
totalOperations += operations
}
return iterations
return totalOperations
}
Loading
Loading