From 979bcffc5598cfe07188bbbb61bbf9baa8757e3d Mon Sep 17 00:00:00 2001 From: Paul Lorenz Date: Thu, 11 Jan 2024 15:09:18 -0500 Subject: [PATCH] Fix pool stall. Fixes #391 --- go.mod | 4 ++-- go.sum | 8 ++++---- goroutines/pool.go | 24 ++++++++++++++---------- 3 files changed, 20 insertions(+), 16 deletions(-) diff --git a/go.mod b/go.mod index e99314f..df4bd44 100644 --- a/go.mod +++ b/go.mod @@ -8,8 +8,8 @@ require ( github.com/speps/go-hashids v2.0.0+incompatible github.com/stretchr/testify v1.8.4 golang.org/x/exp v0.0.0-20220921023135-46d9e7742f1e - golang.org/x/sys v0.15.0 - golang.org/x/term v0.15.0 + golang.org/x/sys v0.16.0 + golang.org/x/term v0.16.0 ) require ( diff --git a/go.sum b/go.sum index 5cd2489..95dce99 100644 --- a/go.sum +++ b/go.sum @@ -19,10 +19,10 @@ github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcU github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= golang.org/x/exp v0.0.0-20220921023135-46d9e7742f1e h1:Ctm9yurWsg7aWwIpH9Bnap/IdSVxixymIb3MhiMEQQA= golang.org/x/exp v0.0.0-20220921023135-46d9e7742f1e/go.mod h1:cyybsKvd6eL0RnXn6p/Grxp8F5bW7iYuBgsNCOHpMYE= -golang.org/x/sys v0.15.0 h1:h48lPFYpsTvQJZF4EKyI4aLHaev3CxivZmv7yZig9pc= -golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= -golang.org/x/term v0.15.0 h1:y/Oo/a/q3IXu26lQgl04j/gjuBDOBlx7X6Om1j2CPW4= -golang.org/x/term v0.15.0/go.mod h1:BDl952bC7+uMoWR75FIrCDx79TPU9oHkTZ9yRbYOrX0= +golang.org/x/sys v0.16.0 h1:xWw16ngr6ZMtmxDyKyIgsE93KNKz5HKmMa3b8ALHidU= +golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/term v0.16.0 h1:m+B6fahuftsE9qjo0VWp2FW0mB3MTJvR0BaMQrq0pmE= +golang.org/x/term v0.16.0/go.mod h1:yn7UURbUtPyrVJPGPq404EukNFxcm/foM+bV/bfcDsY= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20200902074654-038fdea0a05b h1:QRR6H1YWRnHb4Y/HeNFCTJLFVxaq6wH4YuVdsUOr75U= gopkg.in/check.v1 v1.0.0-20200902074654-038fdea0a05b/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/goroutines/pool.go b/goroutines/pool.go index 55bcaea..4c1ebea 100644 --- a/goroutines/pool.go +++ b/goroutines/pool.go @@ -78,7 +78,6 @@ func (self *PoolConfig) Validate() error { if self.MinWorkers > self.MaxWorkers { return fmt.Errorf("min workers must be less than or equal to max workers. min workers=%v, max workers=%v", self.MinWorkers, self.MaxWorkers) } - return nil } @@ -129,13 +128,10 @@ func (self *pool) QueueWithTimeout(work func(), timeout time.Duration) error { } func (self *pool) queueImpl(work func(), timeoutC <-chan time.Time) error { - if self.GetWorkerCount() == 0 { - self.tryAddWorker() - } - select { case self.queue <- work: self.incrQueueSize() + self.ensureNoStarvation() return nil case <-self.closeNotify: return errors.Wrap(PoolStoppedError, "cannot queue") @@ -147,13 +143,10 @@ func (self *pool) queueImpl(work func(), timeoutC <-chan time.Time) error { } func (self *pool) QueueOrError(work func()) error { - if self.GetWorkerCount() == 0 { - self.tryAddWorker() - } - select { case self.queue <- work: self.incrQueueSize() + self.ensureNoStarvation() return nil case <-self.closeNotify: return errors.Wrap(PoolStoppedError, "cannot queue") @@ -164,6 +157,12 @@ func (self *pool) QueueOrError(work func()) error { } } +func (self *pool) ensureNoStarvation() { + if self.minWorkers == 0 && self.GetWorkerCount() == 0 { + self.tryAddWorker() + } +} + func (self *pool) Shutdown() { if self.stopped.CompareAndSwap(false, true) { close(self.closeNotify) @@ -183,7 +182,12 @@ func (self *pool) worker(initialWork func()) { }() defer func() { - self.decrementCount() + // There's a small race condition where the last worker can exit due to idle + // right as something is queued. If we're the last worker, check again, just + // to be sure there's nothing queued. + if newCount := self.decrementCount(); newCount == 0 { + time.AfterFunc(100*time.Millisecond, self.startExtraWorkerIfQueueBusy) + } }() if initialWork != nil {