Skip to content

Commit

Permalink
feat: recover fn
Browse files Browse the repository at this point in the history
  • Loading branch information
zenghur committed Jun 17, 2020
1 parent 52052b3 commit 63ccd84
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 7 deletions.
1 change: 1 addition & 0 deletions dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ func (d *Dispatcher) remove(w *Worker) {
d.guard.Unlock()
}

// NewWorker 生成一个工作协程
func (d *Dispatcher) NewWorker() *Worker {
w := newWorker(d)
d.add(w)
Expand Down
1 change: 1 addition & 0 deletions monitor.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
package curlew

// Monitor 监控
type Monitor func(err error)
23 changes: 16 additions & 7 deletions worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"sync"
)

// Worker 工作协程
type Worker struct {
guard sync.RWMutex
d *Dispatcher
Expand All @@ -28,20 +29,28 @@ func (w *Worker) submit(j *Job) {
}

func (w *Worker) schedule() {

go func() {
for {
select {
case j := <-w.Jobs:
{
ctx, cancel := context.WithTimeout(context.TODO(), w.d.MaxJobRunningTimeout)
if err := j.Fn(ctx, j.Arg); err != nil {
w.d.monitor(fmt.Errorf("job = %#v, err = %#v", j, err))
}
cancel()
w.d.WorkerPool <- w
w.exec(j)
}
}
}
}()
}

func (w *Worker) exec(j *Job) {
ctx, cancel := context.WithTimeout(context.TODO(), w.d.MaxJobRunningTimeout)
defer func() {
if r := recover(); r != nil {
w.d.monitor(fmt.Errorf("fn panic: job = %#v, recover() = %#v", j, r))
}
cancel()
}()
if err := j.Fn(ctx, j.Arg); err != nil {
w.d.monitor(fmt.Errorf("job = %#v, err = %#v", j, err))
}
w.d.WorkerPool <- w
}

0 comments on commit 63ccd84

Please sign in to comment.