-
Notifications
You must be signed in to change notification settings - Fork 8
/
webtail.go
82 lines (70 loc) · 2.42 KB
/
webtail.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
// Package webtail holds tailer service
// You don't need anything except Service methods
package webtail
import (
"net/http"
"sync"
"github.com/go-logr/logr"
)
// codebeat:disable[TOO_MANY_IVARS]
// Config defines local application flags
type Config struct {
Root string `long:"root" default:"log/" description:"Root directory for log files"`
Bytes int64 `long:"bytes" default:"5000" description:"tail from the last Nth location"`
Lines int `long:"lines" default:"100" description:"keep N old lines for new consumers"`
MaxLineSize int `long:"split" default:"180" description:"split line if longer"`
ListCache int `long:"cache" default:"2" description:"Time to cache file listing (sec)"`
Poll bool `long:"poll" description:"use polling, instead of inotify"`
Trace bool `long:"trace" description:"trace worker channels"`
ClientBufferSize int `long:"out_buf" default:"256" description:"Client Buffer Size"`
WSReadBufferSize int `long:"ws_read_buf" default:"1024" description:"WS Read Buffer Size"`
WSWriteBufferSize int `long:"ws_write_buf" default:"1024" description:"WS Write Buffer Size"`
}
// codebeat:enable[TOO_MANY_IVARS]
// Service holds WebTail service
type Service struct {
cfg *Config
hub *Hub
wg *sync.WaitGroup
log logr.Logger
}
// New creates WebTail service
func New(log logr.Logger, cfg *Config) (*Service, error) {
tail, err := NewTailService(log, cfg)
if err != nil {
return nil, err
}
var wg sync.WaitGroup
hub := NewHub(log, tail, &wg)
service := Service{cfg: cfg, hub: hub, log: log, wg: &wg}
return &service, nil
}
// Run runs a message hub
func (wt *Service) Run() {
wt.hub.Run()
}
// Close stops a message hub
func (wt *Service) Close() {
wt.log.Info("Service Exiting")
wt.hub.Close()
wt.wg.Wait()
}
// Handle handles websocket requests from the peer
func (wt *Service) ServeHTTP(w http.ResponseWriter, r *http.Request) {
wsUpgrader := upgrader(wt.cfg.WSReadBufferSize, wt.cfg.WSWriteBufferSize)
conn, err := wsUpgrader.Upgrade(w, r, nil)
if err != nil {
wt.log.Error(err, "Upgrade connection")
return
}
client := &Client{
conn: conn,
send: make(chan []byte, wt.cfg.ClientBufferSize),
log: wt.log,
}
wt.hub.register <- client
// Allow collection of memory referenced by the caller by doing all work in
// new goroutines.
go client.runWritePump(wt.wg)
go client.runReadPump(wt.wg, wt.hub.unregister, wt.hub.broadcast)
}