Skip to content

Commit

Permalink
Merge pull request #392 from openziti/pool-stall
Browse files Browse the repository at this point in the history
Fix pool stall. Fixes #391
  • Loading branch information
plorenz authored Jan 17, 2024
2 parents 48ddf48 + 979bcff commit 30b8942
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 16 deletions.
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ require (
github.com/speps/go-hashids v2.0.0+incompatible
github.com/stretchr/testify v1.8.4
golang.org/x/exp v0.0.0-20220921023135-46d9e7742f1e
golang.org/x/sys v0.15.0
golang.org/x/term v0.15.0
golang.org/x/sys v0.16.0
golang.org/x/term v0.16.0
)

require (
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcU
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
golang.org/x/exp v0.0.0-20220921023135-46d9e7742f1e h1:Ctm9yurWsg7aWwIpH9Bnap/IdSVxixymIb3MhiMEQQA=
golang.org/x/exp v0.0.0-20220921023135-46d9e7742f1e/go.mod h1:cyybsKvd6eL0RnXn6p/Grxp8F5bW7iYuBgsNCOHpMYE=
golang.org/x/sys v0.15.0 h1:h48lPFYpsTvQJZF4EKyI4aLHaev3CxivZmv7yZig9pc=
golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/term v0.15.0 h1:y/Oo/a/q3IXu26lQgl04j/gjuBDOBlx7X6Om1j2CPW4=
golang.org/x/term v0.15.0/go.mod h1:BDl952bC7+uMoWR75FIrCDx79TPU9oHkTZ9yRbYOrX0=
golang.org/x/sys v0.16.0 h1:xWw16ngr6ZMtmxDyKyIgsE93KNKz5HKmMa3b8ALHidU=
golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/term v0.16.0 h1:m+B6fahuftsE9qjo0VWp2FW0mB3MTJvR0BaMQrq0pmE=
golang.org/x/term v0.16.0/go.mod h1:yn7UURbUtPyrVJPGPq404EukNFxcm/foM+bV/bfcDsY=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20200902074654-038fdea0a05b h1:QRR6H1YWRnHb4Y/HeNFCTJLFVxaq6wH4YuVdsUOr75U=
gopkg.in/check.v1 v1.0.0-20200902074654-038fdea0a05b/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
Expand Down
24 changes: 14 additions & 10 deletions goroutines/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@ func (self *PoolConfig) Validate() error {
if self.MinWorkers > self.MaxWorkers {
return fmt.Errorf("min workers must be less than or equal to max workers. min workers=%v, max workers=%v", self.MinWorkers, self.MaxWorkers)
}

return nil
}

Expand Down Expand Up @@ -129,13 +128,10 @@ func (self *pool) QueueWithTimeout(work func(), timeout time.Duration) error {
}

func (self *pool) queueImpl(work func(), timeoutC <-chan time.Time) error {
if self.GetWorkerCount() == 0 {
self.tryAddWorker()
}

select {
case self.queue <- work:
self.incrQueueSize()
self.ensureNoStarvation()
return nil
case <-self.closeNotify:
return errors.Wrap(PoolStoppedError, "cannot queue")
Expand All @@ -147,13 +143,10 @@ func (self *pool) queueImpl(work func(), timeoutC <-chan time.Time) error {
}

func (self *pool) QueueOrError(work func()) error {
if self.GetWorkerCount() == 0 {
self.tryAddWorker()
}

select {
case self.queue <- work:
self.incrQueueSize()
self.ensureNoStarvation()
return nil
case <-self.closeNotify:
return errors.Wrap(PoolStoppedError, "cannot queue")
Expand All @@ -164,6 +157,12 @@ func (self *pool) QueueOrError(work func()) error {
}
}

func (self *pool) ensureNoStarvation() {
if self.minWorkers == 0 && self.GetWorkerCount() == 0 {
self.tryAddWorker()
}
}

func (self *pool) Shutdown() {
if self.stopped.CompareAndSwap(false, true) {
close(self.closeNotify)
Expand All @@ -183,7 +182,12 @@ func (self *pool) worker(initialWork func()) {
}()

defer func() {
self.decrementCount()
// There's a small race condition where the last worker can exit due to idle
// right as something is queued. If we're the last worker, check again, just
// to be sure there's nothing queued.
if newCount := self.decrementCount(); newCount == 0 {
time.AfterFunc(100*time.Millisecond, self.startExtraWorkerIfQueueBusy)
}
}()

if initialWork != nil {
Expand Down

0 comments on commit 30b8942

Please sign in to comment.