Skip to content

Commit

Permalink
Close errors channel on events listening canceled
Browse files Browse the repository at this point in the history
  • Loading branch information
khssnv committed Apr 12, 2024
1 parent 1ebedce commit 196c520
Showing 1 changed file with 9 additions and 0 deletions.
9 changes: 9 additions & 0 deletions blockchain/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,11 +80,14 @@ func (c *Client) StartEventsListening(

liveChangesC := sub.Chan()
histChangesC := make(chan types.StorageChangeSet)
var wg sync.WaitGroup

// Query historical changes.
var cancelled atomic.Value
cancelled.Store(false)
wg.Add(1)
go func(begin types.BlockNumber, liveChanges <-chan types.StorageChangeSet, histChangesC chan types.StorageChangeSet) {
defer wg.Done()
defer close(histChangesC)

set := <-liveChanges // first live changes set block is the last historical block
Expand Down Expand Up @@ -123,7 +126,9 @@ func (c *Client) StartEventsListening(

// Sequence historical and live changes.
changesC := make(chan types.StorageChangeSet)
wg.Add(1)
go func(histChangesC, liveChangesC <-chan types.StorageChangeSet, changesC chan types.StorageChangeSet) {
defer wg.Done()
defer close(changesC)

for set := range histChangesC {
Expand All @@ -137,7 +142,9 @@ func (c *Client) StartEventsListening(

// Decode events from changes skipping blocks before 'begin'.
eventsC := make(chan blockEvents)
wg.Add(1)
go func(changesC <-chan types.StorageChangeSet, eventsC chan blockEvents) {
defer wg.Done()
defer close(eventsC)

for set := range changesC {
Expand Down Expand Up @@ -190,6 +197,8 @@ func (c *Client) StartEventsListening(
once.Do(func() {
sub.Unsubscribe()
cancelled.Store(true)
wg.Wait()
close(c.errsListening)
c.isListening = 0
})
}
Expand Down

0 comments on commit 196c520

Please sign in to comment.