Skip to content

Commit

Permalink
More optimal events listeners registry
Browse files Browse the repository at this point in the history
  • Loading branch information
khssnv committed Mar 27, 2024
1 parent 9988c64 commit f9b1edc
Showing 1 changed file with 9 additions and 20 deletions.
29 changes: 9 additions & 20 deletions blockchain/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package blockchain
import (
"context"
"fmt"
"math"
"sync"
"sync/atomic"

Expand All @@ -19,7 +18,7 @@ type EventsListener func(events *pallets.Events, blockNumber types.BlockNumber,
type Client struct {
*gsrpc.SubstrateAPI

eventsListeners map[int]EventsListener
eventsListeners map[*EventsListener]struct{}
mu sync.Mutex
isListening uint32
cancelListening func()
Expand All @@ -43,7 +42,7 @@ func NewClient(url string) (*Client, error) {

return &Client{
SubstrateAPI: substrateApi,
eventsListeners: make(map[int]EventsListener),
eventsListeners: make(map[*EventsListener]struct{}),
DdcClusters: pallets.NewDdcClustersApi(substrateApi),
DdcCustomers: pallets.NewDdcCustomersApi(substrateApi, meta),
DdcNodes: pallets.NewDdcNodesApi(substrateApi, meta),
Expand Down Expand Up @@ -85,8 +84,8 @@ func (c *Client) StartEventsListening() (context.CancelFunc, <-chan error, error
set.Block,
func(events *pallets.Events, blockNumber types.BlockNumber, blockHash types.Hash) {
c.mu.Lock()
for _, callback := range c.eventsListeners {
go callback(events, blockNumber, blockHash)
for callback := range c.eventsListeners {
go (*callback)(events, blockNumber, blockHash)
}
c.mu.Unlock()
},
Expand Down Expand Up @@ -115,7 +114,8 @@ func (c *Client) RegisterEventsListener(begin types.BlockNumber, callback Events
pendingEvents := &pendingEvents{}
subscriptionStartBlock := uint32(0)
subscriptionStarted := make(chan struct{})
callbackWrapper := func(events *pallets.Events, blockNumber types.BlockNumber, blockHash types.Hash) {

callbackWrapper := EventsListener(func(events *pallets.Events, blockNumber types.BlockNumber, blockHash types.Hash) {
if atomic.CompareAndSwapUint32(&subscriptionStartBlock, 0, uint32(blockNumber)) {
close(subscriptionStarted)
}
Expand All @@ -125,21 +125,10 @@ func (c *Client) RegisterEventsListener(begin types.BlockNumber, callback Events
}

callback(events, blockNumber, blockHash)
}
})

c.mu.Lock()
// Find the smallest available key for the new listener.
var idx int
for i := 0; i <= math.MaxInt; i++ {
if _, ok := c.eventsListeners[i]; !ok {
idx = i
break
}
if i == math.MaxInt {
return nil, fmt.Errorf("too many events listeners")
}
}
c.eventsListeners[idx] = callbackWrapper
c.eventsListeners[&callbackWrapper] = struct{}{}
c.mu.Unlock()

cancelled := false
Expand Down Expand Up @@ -202,7 +191,7 @@ func (c *Client) RegisterEventsListener(begin types.BlockNumber, callback Events
once.Do(func() {
c.mu.Lock()
cancelled = true
delete(c.eventsListeners, idx)
delete(c.eventsListeners, &callbackWrapper)
c.mu.Unlock()
})
}
Expand Down

0 comments on commit f9b1edc

Please sign in to comment.