Skip to content

Commit

Permalink
refactor: 去掉回收worker的机制
Browse files Browse the repository at this point in the history
  • Loading branch information
zenghur committed Oct 27, 2019
1 parent aa2cbe4 commit 9ed287c
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 125 deletions.
35 changes: 14 additions & 21 deletions dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@ import (
type Dispatcher struct {
guard sync.RWMutex
MaxWorkerNum int // maximum worker num in the pool
JobSize int // job buffer size
WorkerIdleTimeout time.Duration // worker
MaxJobRunningTimeout time.Duration // job execution timeout
WorkerPool chan *Worker // worker pool
workers map[*Worker]struct{}
Expand All @@ -28,7 +26,6 @@ type Dispatcher struct {
func New(setters ...Setter) (*Dispatcher, error) {
d := Dispatcher{
MaxWorkerNum: 16,
WorkerIdleTimeout: time.Second * 60,
MaxJobRunningTimeout: 10 * time.Second,
}

Expand All @@ -50,8 +47,6 @@ func New(setters ...Setter) (*Dispatcher, error) {
d.workers = make(map[*Worker]struct{})
d.jobs = make(chan *Job, 1)

d.dispatch()

logger := logrus.New()
logger.SetFormatter(&logrus.JSONFormatter{
TimestampFormat: "2006-01-02 15:04:05",
Expand All @@ -60,6 +55,8 @@ func New(setters ...Setter) (*Dispatcher, error) {
logger.SetLevel(logrus.InfoLevel)
d.logger = logger

d.dispatch()

return &d, nil
}

Expand Down Expand Up @@ -97,31 +94,27 @@ func (d *Dispatcher) remove(w *Worker) {
d.guard.Unlock()
}

func (d *Dispatcher) NewWorker() *Worker {
w := newWorker(d)
d.add(w)
return w
}

func (d *Dispatcher) dispatch() {
go func() {
for j := range d.jobs {
select {
case w := <-d.WorkerPool:
if w.IsClosed() {
d.logger.Debug("Worker is closed and creating an new worker to submit a task.")
NewWorker(d).submit(j)
} else {
d.logger.Debug("Worker is ready to submit a task.")
w.submit(j)
}
d.logger.Debug("Worker is ready to submit a task.")
w.submit(j)
default:
if d.RunningWorkerNum() < d.MaxWorkerNum {
d.logger.Debug("not reach limit yet, create a new worker to submit a task.")
NewWorker(d).submit(j)
d.logger.Debug("Not reach limit yet, create a new worker to submit a task.")
d.NewWorker().submit(j)
} else {
w := <-d.WorkerPool
if w.IsClosed() {
d.logger.Debug("reach limit, wait a closed worker")
NewWorker(d).submit(j)
} else {
d.logger.Debug("reach limit, wait a ready worker")
w.submit(j)
}
d.logger.Debug("Reach limit, wait a ready worker")
w.submit(j)
}
}
}
Expand Down
10 changes: 0 additions & 10 deletions setter.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package curlew

import "time"

// Setter configures a Dispatcher.
type Setter func(d *Dispatcher) error

Expand All @@ -13,14 +11,6 @@ func WithMaxWorkerNum(num int) Setter {
}
}

// WithWorkerIdleTimeout configures worker idle timeout.
func WithWorkerIdleTimeout(t time.Duration) Setter {
return func(d *Dispatcher) error {
d.WorkerIdleTimeout = t
return nil
}
}

// WithMonitor configures a monitor.
func WithMonitor(monitor Monitor) Setter {
return func(d *Dispatcher) error {
Expand Down
122 changes: 28 additions & 94 deletions worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,120 +4,54 @@ import (
"context"
"fmt"
"sync"
"time"
)

type Worker struct {
guard sync.RWMutex
d *Dispatcher
Jobs chan *Job
lastBusyTime time.Time
workerIdleTimeout time.Duration
running bool
closeChan chan struct{}
isClosed bool
guard sync.RWMutex
d *Dispatcher
Jobs chan *Job
}

func NewWorker(d *Dispatcher) *Worker {
w := new(Worker)
func newWorker(d *Dispatcher) *Worker {
w := &Worker{}
w.d = d
w.Jobs = make(chan *Job, 1)
w.workerIdleTimeout = d.WorkerIdleTimeout
w.closeChan = make(chan struct{})
w.isClosed = false
w.schedule()
w.d = d
d.add(w)
return w
}

func (w *Worker) LastBusyTime() time.Time {
w.guard.RLock()
defer w.guard.RUnlock()
return w.lastBusyTime
}

func (w *Worker) SetLastBusyTime() {
w.guard.Lock()
defer w.guard.Unlock()
w.lastBusyTime = time.Now().UTC()
}

func (w *Worker) close() {
w.guard.Lock()
defer w.guard.Unlock()
close(w.closeChan)
w.isClosed = true
}

func (w *Worker) IsClosed() bool {
w.guard.Lock()
defer w.guard.Unlock()
return w.isClosed
}

func (w *Worker) submit(job *Job) {
w.SetRunning(true)
w.Jobs <- job
func (w *Worker) submit(j *Job) {
if j == nil {
return
}
w.Jobs <- j
}

func (w *Worker) schedule() {

go func() {
ticker := time.NewTicker(w.workerIdleTimeout)
defer ticker.Stop()
for range ticker.C {
if w.canClose() {
w.close()
w.d.remove(w)
return
}
}
}()
go func() {
var jr *Job
defer func() {
if r := recover(); r != nil {
w.d.monitor(fmt.Errorf("job crash: job = %#v, err = %#v", jr, r))
}
}()
for {
select {
case <-w.closeChan:
return
case j := <-w.Jobs:
{
jr = j
ctx, cancel := context.WithTimeout(context.TODO(), w.d.MaxJobRunningTimeout)
if err := j.Fn(ctx, j.Arg); err != nil {
w.d.monitor(fmt.Errorf("job = %#v, err = %#v", j, err))
}
cancel()
w.SetLastBusyTime()
w.SetRunning(false)
w.d.WorkerPool <- w
done := make(chan struct{})
go func() {
defer func() {
w.d.WorkerPool <- w
done <- struct{}{}
if r := recover(); r != nil {
w.d.monitor(fmt.Errorf("job crash: job = %#v, err = %#v", 1, r))
}
}()
ctx, cancel := context.WithTimeout(context.TODO(), w.d.MaxJobRunningTimeout)
defer cancel()
if err := j.Fn(ctx, j.Arg); err != nil {
w.d.monitor(fmt.Errorf("job = %#v, err = %#v", j, err))
}
}()
<-done
}
}
}
}()
}

func (w *Worker) SetRunning(b bool) {
w.guard.Lock()
defer w.guard.Unlock()
w.running = b
}

func (w *Worker) Running() bool {
w.guard.RLock()
defer w.guard.RUnlock()
return w.running
}

func (w *Worker) canClose() bool {
w.guard.RLock()
defer w.guard.RUnlock()
w.d.logger.Debugf("worker running status: %v, deadline: %v, now: %v", w.running, w.lastBusyTime.Add(w.workerIdleTimeout), time.Now().UTC())
if !w.running && w.lastBusyTime.Add(w.workerIdleTimeout).Before(time.Now().UTC()) {
return true
}
return false
}

0 comments on commit 9ed287c

Please sign in to comment.