Skip to content

Commit

Permalink
fix: 过期worker
Browse files Browse the repository at this point in the history
  • Loading branch information
zenghur committed Oct 26, 2019
1 parent b9ac0f9 commit aa2cbe4
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 19 deletions.
21 changes: 11 additions & 10 deletions dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ type Dispatcher struct {
func New(setters ...Setter) (*Dispatcher, error) {
d := Dispatcher{
MaxWorkerNum: 16,
JobSize: 16,
WorkerIdleTimeout: time.Second * 60,
MaxJobRunningTimeout: 10 * time.Second,
}
Expand All @@ -43,17 +42,13 @@ func New(setters ...Setter) (*Dispatcher, error) {
return nil, errors.New("must have at least one worker in the pool")
}

if d.JobSize < 1 {
return nil, errors.New("must have at least one job buffered in the channel")
}

if d.monitor == nil {
return nil, errors.New("no monitor provided")
}

d.WorkerPool = make(chan *Worker, d.MaxWorkerNum)
d.workers = make(map[*Worker]struct{})
d.jobs = make(chan *Job, d.JobSize)
d.jobs = make(chan *Job, 1)

d.dispatch()

Expand Down Expand Up @@ -108,19 +103,25 @@ func (d *Dispatcher) dispatch() {
select {
case w := <-d.WorkerPool:
if w.IsClosed() {
d.logger.Debug("worker is closed")
d.logger.Debug("Worker is closed and creating an new worker to submit a task.")
NewWorker(d).submit(j)
} else {
d.logger.Debug("Worker is ready to submit a task.")
w.submit(j)
}
default:
if d.RunningWorkerNum() < d.MaxWorkerNum {
d.logger.Debug("not reach limit yet, create new worker")
d.logger.Debug("not reach limit yet, create a new worker to submit a task.")
NewWorker(d).submit(j)
} else {
d.logger.Debug("reach limit, wait a ready worker")
w := <-d.WorkerPool
w.submit(j)
if w.IsClosed() {
d.logger.Debug("reach limit, wait a closed worker")
NewWorker(d).submit(j)
} else {
d.logger.Debug("reach limit, wait a ready worker")
w.submit(j)
}
}
}
}
Expand Down
8 changes: 0 additions & 8 deletions setter.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,6 @@ func WithMaxWorkerNum(num int) Setter {
}
}

// WithJobSize configures job buffer size.
func WithJobSize(size int) Setter {
return func(d *Dispatcher) error {
d.JobSize = size
return nil
}
}

// WithWorkerIdleTimeout configures worker idle timeout.
func WithWorkerIdleTimeout(t time.Duration) Setter {
return func(d *Dispatcher) error {
Expand Down
2 changes: 1 addition & 1 deletion worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ func (w *Worker) IsClosed() bool {
}

func (w *Worker) submit(job *Job) {
w.SetRunning(true)
w.Jobs <- job
}

Expand Down Expand Up @@ -85,7 +86,6 @@ func (w *Worker) schedule() {
case j := <-w.Jobs:
{
jr = j
w.SetRunning(true)
ctx, cancel := context.WithTimeout(context.TODO(), w.d.MaxJobRunningTimeout)
if err := j.Fn(ctx, j.Arg); err != nil {
w.d.monitor(fmt.Errorf("job = %#v, err = %#v", j, err))
Expand Down

0 comments on commit aa2cbe4

Please sign in to comment.