From 393acf94203b62623fb3315b46bb8be4bf32fe94 Mon Sep 17 00:00:00 2001 From: Soroosh Tanzadeh Date: Wed, 7 Aug 2024 13:31:31 +0330 Subject: [PATCH] Improve error management (#2) --- runner/errors.go | 24 ++++++++++++++++++++++++ runner/task_runner.go | 4 ++++ runner/task_runner_test.go | 6 ++++-- runner/timing.go | 4 ++-- runner/worker.go | 15 +++++++-------- 5 files changed, 41 insertions(+), 12 deletions(-) diff --git a/runner/errors.go b/runner/errors.go index 369a507..d94de21 100644 --- a/runner/errors.go +++ b/runner/errors.go @@ -5,3 +5,27 @@ type TaskRunnerError string func (tre TaskRunnerError) Error() string { return string(tre) } + +type TaskExecutionError struct { + taskname string + err error +} + +func NewTaskExecutionError(task string, err error) TaskExecutionError { + return TaskExecutionError{ + err: err, + taskname: task, + } +} + +func (t TaskExecutionError) Error() string { + return t.taskname + ":" + t.err.Error() +} + +func (t TaskExecutionError) GetError() error { + return t.err +} + +func (t TaskExecutionError) GetTaskName() string { + return t.taskname +} diff --git a/runner/task_runner.go b/runner/task_runner.go index 75946a6..d4590a6 100644 --- a/runner/task_runner.go +++ b/runner/task_runner.go @@ -88,6 +88,10 @@ func (t *TaskRunner) ErrorChannel() chan error { return t.errorChannel } +func (t *TaskRunner) captureError(err error) { + t.errorChannel <- err +} + func (t *TaskRunner) Start(ctx context.Context) error { if t.status.Load() != stateInit { return ErrTaskRunnerAlreadyStarted diff --git a/runner/task_runner_test.go b/runner/task_runner_test.go index 60d3d17..5b4414c 100644 --- a/runner/task_runner_test.go +++ b/runner/task_runner_test.go @@ -101,6 +101,7 @@ func (t *TaskRunnerTestSuit) Test_ShouldRetryTaskBaseOnMaxRetry() { counter := atomic.Int64{} expectedPayload := "Test Payload" expectedError := errors.New("I'm Panic Error") + expectedErrorWrap := NewTaskExecutionError("task", expectedError) _, taskRunner := t.setupTaskRunner(t.setupRedis()) taskRunner.RegisterTask(&Task{ @@ -131,7 +132,7 @@ func (t *TaskRunnerTestSuit) Test_ShouldRetryTaskBaseOnMaxRetry() { t.Assert().Equal(10, counter.Load()) break case err := <-taskRunner.ErrorChannel(): - if err != expectedError { + if err.(TaskExecutionError).GetError().Error() != expectedErrorWrap.GetError().Error() { t.FailNow(err.Error()) } case <-time.After(time.Second): @@ -144,6 +145,7 @@ func (t *TaskRunnerTestSuit) Test_ShouldRetryTaskWhenPaniced() { counter := atomic.Int64{} expectedPayload := "Test Payload" expectedError := errors.New("I'm Panic Error") + expectedErrorWrap := NewTaskExecutionError("task", expectedError) _, taskRunner := t.setupTaskRunner(t.setupRedis()) taskRunner.RegisterTask(&Task{ @@ -174,7 +176,7 @@ func (t *TaskRunnerTestSuit) Test_ShouldRetryTaskWhenPaniced() { t.Assert().Equal(10, counter.Load()) break case err := <-taskRunner.ErrorChannel(): - if err != expectedError { + if err.(TaskExecutionError).GetError().Error() != expectedErrorWrap.GetError().Error() { t.FailNow(err.Error()) } case <-time.After(time.Second): diff --git a/runner/timing.go b/runner/timing.go index c05e56d..52b9d07 100644 --- a/runner/timing.go +++ b/runner/timing.go @@ -45,7 +45,7 @@ func (t *TaskRunner) timingAggregator() { continue } - t.errorChannel <- err + t.captureError(err) continue } @@ -59,7 +59,7 @@ func (t *TaskRunner) timingAggregator() { avgTiming := totalExecutionAverage queueLen, err := t.queue.Len() if err != nil { - t.errorChannel <- err + t.captureError(err) return } // TODO I don't know if this way of predicting is true or not diff --git a/runner/worker.go b/runner/worker.go index c964e32..f42351b 100644 --- a/runner/worker.go +++ b/runner/worker.go @@ -49,18 +49,17 @@ func (t *TaskRunner) process(ctx context.Context, workerID int) { logEntry := log.WithField("worker_id", workerID).WithField("cause", r) if ok { logEntry = logEntry.WithError(err) - resultChannel <- err + resultChannel <- NewTaskExecutionError(task.Name, err) } else { - resultChannel <- TaskRunnerError(fmt.Sprintf("Task %s execution failed", task.Name)) + resultChannel <- NewTaskExecutionError(task.Name, TaskRunnerError(fmt.Sprintf("Task %s Panic: %v", task.Name, err))) } - logEntry.Error("Task Panic") - + logEntry.Errorf("Task %s Panic", task.Name) } }() // Note: Deferred function calls are pushed onto a stack. if err := task.Action(ctx, payload); err != nil { - resultChannel <- err + resultChannel <- NewTaskExecutionError(task.Name, err) } resultChannel <- true } @@ -132,7 +131,7 @@ func (t *TaskRunner) process(ctx context.Context, workerID int) { case result := <-resultChannel: if _, ok := result.(bool); !ok { failed() - return result.(error) + return result.(TaskExecutionError) } t.processed.Add(1) return nil @@ -140,7 +139,7 @@ func (t *TaskRunner) process(ctx context.Context, workerID int) { // Task execution is taking time, send heartbeat to prevent reClaim case <-time.After(task.ReservationTimeout): if err := hbf(ctx); err != nil { - t.errorChannel <- err + t.captureError(err) } } } @@ -151,7 +150,7 @@ func (t *TaskRunner) afterProcess(task *Task, payload any) { if task.Unique { err := t.releaseLock(task.lockKey(payload), task.lockValue) if err != nil { - t.errorChannel <- err + t.captureError(err) } } }