diff --git a/blockchain/client.go b/blockchain/client.go index d4ad6a9..224bc7c 100644 --- a/blockchain/client.go +++ b/blockchain/client.go @@ -3,7 +3,6 @@ package blockchain import ( "context" "fmt" - "math" "sync" "sync/atomic" @@ -19,7 +18,7 @@ type EventsListener func(events *pallets.Events, blockNumber types.BlockNumber, type Client struct { *gsrpc.SubstrateAPI - eventsListeners map[int]EventsListener + eventsListeners map[*EventsListener]struct{} mu sync.Mutex isListening uint32 cancelListening func() @@ -43,7 +42,7 @@ func NewClient(url string) (*Client, error) { return &Client{ SubstrateAPI: substrateApi, - eventsListeners: make(map[int]EventsListener), + eventsListeners: make(map[*EventsListener]struct{}), DdcClusters: pallets.NewDdcClustersApi(substrateApi), DdcCustomers: pallets.NewDdcCustomersApi(substrateApi, meta), DdcNodes: pallets.NewDdcNodesApi(substrateApi, meta), @@ -85,8 +84,8 @@ func (c *Client) StartEventsListening() (context.CancelFunc, <-chan error, error set.Block, func(events *pallets.Events, blockNumber types.BlockNumber, blockHash types.Hash) { c.mu.Lock() - for _, callback := range c.eventsListeners { - go callback(events, blockNumber, blockHash) + for callback := range c.eventsListeners { + go (*callback)(events, blockNumber, blockHash) } c.mu.Unlock() }, @@ -115,7 +114,8 @@ func (c *Client) RegisterEventsListener(begin types.BlockNumber, callback Events pendingEvents := &pendingEvents{} subscriptionStartBlock := uint32(0) subscriptionStarted := make(chan struct{}) - callbackWrapper := func(events *pallets.Events, blockNumber types.BlockNumber, blockHash types.Hash) { + + callbackWrapper := EventsListener(func(events *pallets.Events, blockNumber types.BlockNumber, blockHash types.Hash) { if atomic.CompareAndSwapUint32(&subscriptionStartBlock, 0, uint32(blockNumber)) { close(subscriptionStarted) } @@ -125,21 +125,10 @@ func (c *Client) RegisterEventsListener(begin types.BlockNumber, callback Events } callback(events, blockNumber, blockHash) - } + }) c.mu.Lock() - // Find the smallest available key for the new listener. - var idx int - for i := 0; i <= math.MaxInt; i++ { - if _, ok := c.eventsListeners[i]; !ok { - idx = i - break - } - if i == math.MaxInt { - return nil, fmt.Errorf("too many events listeners") - } - } - c.eventsListeners[idx] = callbackWrapper + c.eventsListeners[&callbackWrapper] = struct{}{} c.mu.Unlock() cancelled := false @@ -202,7 +191,7 @@ func (c *Client) RegisterEventsListener(begin types.BlockNumber, callback Events once.Do(func() { c.mu.Lock() cancelled = true - delete(c.eventsListeners, idx) + delete(c.eventsListeners, &callbackWrapper) c.mu.Unlock() }) }