-
Notifications
You must be signed in to change notification settings - Fork 0
/
dispatcher.go
114 lines (96 loc) · 2.12 KB
/
dispatcher.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
package dispatcher
import (
"sync"
"sync/atomic"
"time"
)
// Dispatcher defines an interface for a function dispatcher
type Dispatcher interface {
// Do causes function to be scheduled for execution.
// Execution trigger is implementation specific.
Do(f func())
// IsRunning provides activity status of the dispatcher
IsRunning() bool
}
// dispatcher implements Dispatcher interface such that is allows user
// to limit the number of active goroutines running at a time.
type dispatcher struct {
queue *queue
cap int32
active *int32
poke chan struct{}
mu sync.Mutex
}
// New provides a new instance of dispatcher
func New(numConcurrent int32) *dispatcher {
d := new(dispatcher)
d.queue = new(queue)
d.cap = numConcurrent
d.active = new(int32)
d.poke = make(chan struct{})
d.bot() // starts a daemon that will schedule pending funcs
return d
}
// queue is a queue of func
type queue []func()
func (s *queue) len() int {
return len(*s)
}
// push pushes new entry at the end of the queue
func (s *queue) push(f func()) {
*s = append(*s, f)
}
// pop pulls from the front of the queue
func (s *queue) pop() func() {
if len(*s) > 0 {
f := (*s)[0]
*s = (*s)[1:]
return f
}
return nil
}
func (d *dispatcher) IsRunning() bool {
return *(d.active) > 0
}
func (d *dispatcher) pending() int {
return d.queue.len()
}
func (d *dispatcher) Do(f func()) {
// lock
d.mu.Lock()
defer d.mu.Unlock()
// push into queue
d.queue.push(f)
d.dispatch()
}
// dispatch is an internal function
func (d *dispatcher) dispatch() {
for *(d.active) < d.cap {
f := d.queue.pop()
if f == nil {
break
}
// increment the active counter
atomic.AddInt32(d.active, 1)
go func(active *int32, poke chan struct{}) {
f()
atomic.AddInt32(active, -1)
d.poke <- struct{}{}
}(d.active, d.poke)
}
}
// bot is an internal function that monitors the active functions and dispatches new from pending queue
func (d *dispatcher) bot() {
go func() {
// run infinite loop waiting every second
for {
d.mu.Lock()
d.dispatch()
d.mu.Unlock()
select {
case <-d.poke:
case <-time.After(time.Second):
}
}
}()
}