-
Notifications
You must be signed in to change notification settings - Fork 0
/
parallel.go
95 lines (81 loc) · 1.92 KB
/
parallel.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
package flash
import (
"context"
"fmt"
"sync"
uuid "github.com/satori/go.uuid"
)
// Parallel is an executor for parallel executions
type Parallel struct {
executor
wg *sync.WaitGroup
successHandler func()
failHandler func(err error)
}
type ParallelOption func(parallel *Parallel)
// NewParallel - initializes a parallel executor
func NewParallel(ctx context.Context, opts ...ParallelOption) *Parallel {
par := &Parallel{
executor: executor{
id: fmt.Sprintf("%s-%s", "parallel", uuid.NewV4().String()),
ctx: ctx,
},
wg: &sync.WaitGroup{},
}
for _, opt := range opts {
opt(par)
}
return par
}
// ParallelFailHandler - inits fail handler
func ParallelFailHandler(fail func(err error)) ParallelOption {
return func(p *Parallel) {
p.failHandler = fail
}
}
// ParallelSuccessHandler - inits success handler
func ParallelSuccessHandler(success func()) ParallelOption {
return func(p *Parallel) {
p.successHandler = success
}
}
// Execute - executes all executables In parallel
func (p *Parallel) Execute() error {
if err := p.executor.Execute(); err != nil {
return err
}
p.executeWg()
return nil
}
func (p *Parallel) executeWg() {
p.wg.Add(len(p.executables))
for i := 0; i < len(p.executables); i++ {
go func(i int) {
defer p.wg.Done()
if !p.executables[i].IsCompleted() {
if err := p.executables[i].Execute(); err != nil {
log(p.id).Errorf("error while executing: %+v", err)
p.executables[i].OnFailure(err)
return
}
log(p.id).Infof("completed executing: %+v", p.executables[i])
p.executables[i].OnSuccess()
}
}(i)
}
p.wg.Wait()
}
// OnSuccess - handles completion callback
func (p *Parallel) OnSuccess() {
p.executor.OnSuccess()
if p.successHandler != nil {
p.successHandler()
}
}
// OnFailure - handles failure callback
func (p *Parallel) OnFailure(err error) {
p.executor.OnFailure(err)
if p.failHandler != nil {
p.failHandler(err)
}
}