Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(streamer): distribute rewards immediately in the current block #1173

Merged
merged 31 commits into from
Sep 20, 2024
Merged
Show file tree
Hide file tree
Changes from 28 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
3850352
fix(streamer): added streamer pagination
keruch Aug 13, 2024
1a5cd55
feat(streamer): added stream iterator
keruch Aug 14, 2024
7a1efa7
feat(streamer): EndBlocker
keruch Aug 15, 2024
b6b07f2
feat(streamer): AfterEpochEnd + tests
keruch Aug 16, 2024
878e9bc
feat(streamer): fixed module tests
keruch Aug 17, 2024
7a93f46
feat(streamer): EndBlocker tests
keruch Aug 19, 2024
d382d7b
feat(streamer): EndBlocker tests 2
keruch Aug 19, 2024
3217092
feat(streamer): EndBlocker tests 3
keruch Aug 19, 2024
cab303b
rebase + wip: threads
keruch Aug 29, 2024
43eaa69
upgate hanlder + threads 2
keruch Aug 30, 2024
7d748af
linter
keruch Aug 30, 2024
a1e983b
threads 3
keruch Sep 3, 2024
54f08aa
Merge branch 'main' into kirill/1010-streamer-pagination
keruch Sep 3, 2024
2b4b107
Merge branch 'main' into kirill/1010-streamer-pagination
keruch Sep 9, 2024
0e287bf
threads
keruch Sep 9, 2024
3a18dc0
upgate hanlder + threads 2
keruch Aug 30, 2024
b8d542d
feat(streamer): distribute rewards immediately in the current block
keruch Sep 2, 2024
bf971ca
updated x/incentives
keruch Sep 3, 2024
209b862
added unit tests
keruch Sep 3, 2024
1541ec3
docstrings
keruch Sep 3, 2024
ef3ddd8
threads
keruch Sep 9, 2024
55ef675
threads 2
keruch Sep 9, 2024
24f60ef
threads 2
keruch Sep 10, 2024
a4baf0b
Merge branch 'main' into kirill/354-streamer-distributes-immediately
keruch Sep 12, 2024
d60d581
merge commit
keruch Sep 12, 2024
4042cf3
small fix
keruch Sep 12, 2024
ffb7351
Merge branch 'main' into kirill/354-streamer-distributes-immediately
keruch Sep 16, 2024
90d79e3
merge commit
keruch Sep 16, 2024
34efd4b
threads 3
keruch Sep 17, 2024
0f4d284
error strings
keruch Sep 17, 2024
897e8ed
threads
keruch Sep 19, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion app/keepers/keepers.go
Original file line number Diff line number Diff line change
Expand Up @@ -565,8 +565,8 @@ func (a *AppKeepers) SetupHooks() {
a.EpochsKeeper.SetHooks(
epochstypes.NewMultiEpochHooks(
// insert epochs hooks receivers here
a.StreamerKeeper.Hooks(), // x/streamer must be before x/incentives
a.IncentivesKeeper.Hooks(),
a.StreamerKeeper.Hooks(),
a.TxFeesKeeper.Hooks(),
a.DelayedAckKeeper.GetEpochHooks(),
a.DymNSKeeper.GetEpochHooks(),
Expand Down
2 changes: 1 addition & 1 deletion proto/dymensionxyz/dymension/streamer/stream.proto
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ message Stream {
uint64 id = 1;

// distribute_to is the distr_info.
DistrInfo distribute_to = 2;
DistrInfo distribute_to = 2 [ (gogoproto.nullable) = false ];

// coins is the total amount of coins that have been in the stream
// Can distribute multiple coin denoms
Expand Down
72 changes: 72 additions & 0 deletions utils/cache/linked_list.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package cache

// List is a double linked list with the limited number of methods.
// NOT thread safe!
type List[T any] struct {
first *Node[T]
last *Node[T]
}

func NewList[T any]() *List[T] {
return &List[T]{
first: nil,
last: nil,
}
}

// Node is an element of doubly linked list.
// NOT thread safe!
type Node[T any] struct {
elem T
prev *Node[T]
next *Node[T]
list *List[T]
}

// Insert inserts new node to the end of the list.
// NOT thread safe!
func (l *List[T]) Insert(elem T) *Node[T] {
n := &Node[T]{
elem: elem,
prev: l.last,
next: nil,
list: l,
}
if l.last == nil {
l.first = n
} else {
l.last.next = n
}
l.last = n
return n
}

// Delete removes node from the list.
// NOT thread safe!
func (n *Node[T]) Delete() {
if n.list.first == n {
n.list.first = n.next
}
if n.list.last == n {
n.list.last = n.prev
}
if n.next != nil {
n.next.prev = n.prev
}
if n.prev != nil {
n.prev.next = n.next
}
n.next, n.prev = nil, nil
}

// Range loops over all elements and calls f with each of them.
// NOT thread safe!
func (l *List[T]) Range(f func(T) bool) {
n := l.first
for n != nil {
if !f(n.elem) {
break

Check warning on line 68 in utils/cache/linked_list.go

View check run for this annotation

Codecov / codecov/patch

utils/cache/linked_list.go#L68

Added line #L68 was not covered by tests
}
n = n.next
}
}
173 changes: 173 additions & 0 deletions utils/cache/linked_list_internal_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
package cache

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestList(t *testing.T) {
t.Parallel()

t.Run("empty", func(t *testing.T) {
t.Parallel()
l := NewList[int]()
assertForEach(t, l, nil)
})
t.Run("insert 1 element", func(t *testing.T) {
t.Parallel()
l := NewList[int]()

a := l.Insert(10)

assertForEach(t, l, []int{10})
assert.Same(t, a, l.first)
assert.Same(t, a, l.last)
assert.Nil(t, a.prev)
assert.Nil(t, a.next)
})
t.Run("delete 1 element", func(t *testing.T) {
t.Parallel()
l := NewList[int]()
a := l.Insert(10)

a.Delete()

assertForEach(t, l, nil)
assert.Nil(t, l.first)
assert.Nil(t, l.last)
assert.Nil(t, a.prev)
assert.Nil(t, a.next)
})
t.Run("insert 2 elements", func(t *testing.T) {
t.Parallel()
l := NewList[int]()

a := l.Insert(10)
b := l.Insert(20)

assertForEach(t, l, []int{10, 20})
assert.Same(t, a, l.first)
assert.Same(t, b, l.last)
assert.Nil(t, a.prev)
assert.Same(t, b, a.next)
assert.Same(t, a, b.prev)
assert.Nil(t, b.next)
})
t.Run("delete last from 2 elements", func(t *testing.T) {
t.Parallel()
l := NewList[int]()
a := l.Insert(10)
b := l.Insert(20)

b.Delete()

assertForEach(t, l, []int{10})
assert.Same(t, a, l.first)
assert.Same(t, a, l.last)
assert.Nil(t, a.prev)
assert.Nil(t, a.next)
assert.Nil(t, b.prev)
assert.Nil(t, b.next)
})
t.Run("delete first from 2 elements", func(t *testing.T) {
t.Parallel()
l := NewList[int]()
a := l.Insert(10)
b := l.Insert(20)

a.Delete()

assertForEach(t, l, []int{20})
assert.Same(t, b, l.first)
assert.Same(t, b, l.last)
assert.Nil(t, a.prev)
assert.Nil(t, a.next)
assert.Nil(t, b.prev)
assert.Nil(t, b.next)
})
t.Run("insert 3 elements", func(t *testing.T) {
t.Parallel()
l := NewList[int]()

a := l.Insert(10)
b := l.Insert(20)
c := l.Insert(30)

assertForEach(t, l, []int{10, 20, 30})
assert.Same(t, a, l.first)
assert.Same(t, c, l.last)
assert.Nil(t, a.prev)
assert.Same(t, b, a.next)
assert.Same(t, a, b.prev)
assert.Same(t, c, b.next)
assert.Same(t, b, c.prev)
assert.Nil(t, c.next)
})
t.Run("delete last from 3 elements", func(t *testing.T) {
t.Parallel()
l := NewList[int]()
a := l.Insert(10)
b := l.Insert(20)
c := l.Insert(30)

c.Delete()

assertForEach(t, l, []int{10, 20})
assert.Same(t, a, l.first)
assert.Same(t, b, l.last)
assert.Nil(t, a.prev)
assert.Same(t, b, a.next)
assert.Same(t, a, b.prev)
assert.Nil(t, b.next)
assert.Nil(t, c.prev)
assert.Nil(t, c.next)
})
t.Run("delete first from 3 elements", func(t *testing.T) {
t.Parallel()
l := NewList[int]()
a := l.Insert(10)
b := l.Insert(20)
c := l.Insert(30)

a.Delete()

assertForEach(t, l, []int{20, 30})
assert.Same(t, b, l.first)
assert.Same(t, c, l.last)
assert.Nil(t, a.prev)
assert.Nil(t, a.next)
assert.Nil(t, b.prev)
assert.Same(t, c, b.next)
assert.Same(t, b, c.prev)
assert.Nil(t, c.next)
})
t.Run("delete middle from 3 elements", func(t *testing.T) {
t.Parallel()
l := NewList[int]()
a := l.Insert(10)
b := l.Insert(20)
c := l.Insert(30)

b.Delete()

assertForEach(t, l, []int{10, 30})
assert.Same(t, a, l.first)
assert.Same(t, c, l.last)
assert.Nil(t, a.prev)
assert.Same(t, c, a.next)
assert.Nil(t, b.prev)
assert.Nil(t, b.next)
assert.Same(t, a, c.prev)
assert.Nil(t, c.next)
})
}

func assertForEach[T any](t *testing.T, l *List[T], expected []T) {
var actual []T
l.Range(func(v T) bool {
actual = append(actual, v)
return true
})
assert.Equal(t, expected, actual)
}
99 changes: 99 additions & 0 deletions utils/cache/ordered.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package cache

// InsertionOrdered cache preserves insertion-order.
// NOT thread safe!
type InsertionOrdered[K comparable, V any] struct {
zale144 marked this conversation as resolved.
Show resolved Hide resolved
dataIdx map[K]*Node[V]
key func(V) K
data *List[V]
}

func NewInsertionOrdered[K comparable, V any](key func(V) K, initial ...V) *InsertionOrdered[K, V] {
cache := &InsertionOrdered[K, V]{
dataIdx: make(map[K]*Node[V], 0),
key: key,
data: NewList[V](),
}
cache.UpsertMultiple(initial...)
return cache
}

func (c *InsertionOrdered[K, V]) Reset(values ...V) {
c.dataIdx = make(map[K]*Node[V], len(values))
c.data = NewList[V]()
c.UpsertMultiple(values...)
}

func (c *InsertionOrdered[K, V]) Upsert(value V) {
key := c.key(value)
v, ok := c.dataIdx[key]
if ok {
// if the value is present, just update it
v.elem = value
} else {
// otherwise, create a new one
c.dataIdx[key] = c.data.Insert(value)
}
}

func (c *InsertionOrdered[K, V]) UpsertMultiple(values ...V) {
for _, value := range values {
c.Upsert(value)
}
}

func (c *InsertionOrdered[K, V]) Delete(keys ...K) {
for _, key := range keys {
if value, ok := c.dataIdx[key]; ok {
value.Delete()
}
delete(c.dataIdx, key)
}
}

func (c *InsertionOrdered[K, V]) Get(key K) (V, bool) {
value, ok := c.dataIdx[key]
if ok {
return value.elem, true
}
var zero V
return zero, false
}

func (c *InsertionOrdered[K, V]) GetAll() []V {
res := make([]V, 0, len(c.dataIdx))
c.data.Range(func(v V) bool {
res = append(res, v)
return true
})
return res
}

func (c *InsertionOrdered[K, V]) Filter(condition func(V) bool) []V {
var res []V
c.data.Range(func(v V) bool {
if condition(v) {
res = append(res, v)
}
return true

Check warning on line 78 in utils/cache/ordered.go

View check run for this annotation

Codecov / codecov/patch

utils/cache/ordered.go#L72-L78

Added lines #L72 - L78 were not covered by tests
})
return res

Check warning on line 80 in utils/cache/ordered.go

View check run for this annotation

Codecov / codecov/patch

utils/cache/ordered.go#L80

Added line #L80 was not covered by tests
}

func (c *InsertionOrdered[K, V]) FindFirst(condition func(V) bool) (V, bool) {
var res V
var found bool
c.data.Range(func(v V) bool {
if condition(v) {
res = v
found = true
return false
}
return true

Check warning on line 92 in utils/cache/ordered.go

View check run for this annotation

Codecov / codecov/patch

utils/cache/ordered.go#L83-L92

Added lines #L83 - L92 were not covered by tests
})
return res, found

Check warning on line 94 in utils/cache/ordered.go

View check run for this annotation

Codecov / codecov/patch

utils/cache/ordered.go#L94

Added line #L94 was not covered by tests
}

func (c *InsertionOrdered[K, V]) Range(f func(V) bool) {
c.data.Range(f)
}
Loading
Loading