-
Notifications
You must be signed in to change notification settings - Fork 6
/
webhooks.go
112 lines (96 loc) · 2.24 KB
/
webhooks.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
package zwibserve
import (
"context"
"log"
"sync"
"time"
)
type webhookEvent struct {
sendBy time.Time
url string
name string
documentID string
username string
password string
}
type webhookQueue struct {
events []webhookEvent
mutex sync.Mutex
cancel func()
}
func createWebhookQueue() *webhookQueue {
whq := &webhookQueue{
cancel: func() {},
}
go func() {
// in a loop, figure out how long we have to sleep for and then
// wait for that amount of time.
for {
// set up the cancellable timeout.
whq.mutex.Lock()
ctx, cancel := context.WithCancel(context.Background())
now := time.Now()
at := now.Add(time.Hour * 24)
// determine the minimum event
removed := 0
for i := range whq.events {
item := whq.events[i]
if item.sendBy.Before(now) {
go item.send()
removed++
continue
} else if item.sendBy.Before(at) {
at = item.sendBy
}
if removed > 0 {
whq.events[i-removed] = whq.events[i]
}
}
whq.events = whq.events[:len(whq.events)-removed]
whq.cancel = cancel
whq.mutex.Unlock()
select {
case <-ctx.Done():
case <-time.After(at.Sub(now)):
}
}
}()
return whq
}
func (whq *webhookQueue) removeIf(fn func(event webhookEvent) bool) {
whq.mutex.Lock()
defer whq.mutex.Unlock()
removed := 0
l := len(whq.events)
for i := range whq.events {
if fn(whq.events[i]) {
removed++
log.Printf("Remove queued webhook %s/%s", whq.events[i].name, whq.events[i].documentID)
} else if removed > 0 {
whq.events[i-removed] = whq.events[i]
}
}
whq.events = whq.events[:l-removed]
}
func (whq *webhookQueue) add(event webhookEvent) {
whq.mutex.Lock()
defer whq.mutex.Unlock()
log.Printf("Queue webhook %s/%s", event.name, event.documentID)
whq.events = append(whq.events, event)
whq.cancel()
}
func (event webhookEvent) send() {
var reply string
result := MakeHTTPRequest(HTTPRequestArgs{
Method: "POST",
URI: event.url,
Data: map[string]interface{}{
"event": event.name,
"documentID": event.documentID,
},
Username: event.username,
Password: event.password,
}, &reply)
log.Printf("%s/%s to %s: HTTP Status=%v Error=%v", event.name, event.documentID, event.url,
result.StatusCode, result.Err)
}