-
Notifications
You must be signed in to change notification settings - Fork 3
/
analyzer.go
145 lines (129 loc) · 2.71 KB
/
analyzer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
package main
import (
"bufio"
"context"
"io"
"sync"
"sync/atomic"
)
const bufSize = 1000
type Analyzer struct {
Procs int
Path string
Log io.Reader
Func AnalyzerFunc
Offset int64
wg sync.WaitGroup
bytesRead int64
VerboseResults bool
}
type discardInterface interface {
io.Writer
io.ReaderFrom
}
type DiscardWriter struct {
discardWriter discardInterface
BytesRead int64
}
func (d *DiscardWriter) Write(b []byte) (int, error) {
n, err := d.discardWriter.Write(b)
d.BytesRead += int64(n)
return n, err
}
func (d *DiscardWriter) ReadFrom(r io.Reader) (int64, error) {
n, err := d.discardWriter.ReadFrom(r)
d.BytesRead += n
return n, err
}
func NewDiscardWriter() *DiscardWriter {
discard := io.Discard.(discardInterface)
return &DiscardWriter{discardWriter: discard}
}
type AnalyzerFunc func([]byte) *Result
type Result struct {
Path string `json:"path"`
Match string `json:"match,omitempty"`
Err error `json:"error,omitempty"`
Offset int64 `json:"offset"`
}
type LineMsg struct {
Line []byte
Offset int64
}
func (a *Analyzer) Go(ctx context.Context) <-chan Result {
resultC := make(chan Result)
a.wg.Add(a.Procs)
producer := a.startProducer(ctx)
go func() {
a.wg.Wait()
close(resultC)
}()
for i := 0; i < a.Procs; i++ {
go a.consumer(ctx, producer, resultC)
}
return resultC
}
func (a *Analyzer) BytesRead() int64 {
return atomic.LoadInt64(&a.bytesRead)
}
func (a *Analyzer) startProducer(ctx context.Context) <-chan LineMsg {
logLines := make(chan LineMsg, bufSize)
currentOffset := a.Offset
reader := bufio.NewReaderSize(a.Log, 32*1024*1024)
a.wg.Add(1)
go func() {
defer a.wg.Done()
defer close(logLines)
for {
line, err := reader.ReadBytes('\n')
if err != nil && err != io.EOF {
fatal("error while scanning log: %s: %s\n", a.Path, err)
}
atomic.AddInt64(&a.bytesRead, int64(len(line)))
if len(line) > 0 {
select {
case <-ctx.Done():
return
case logLines <- LineMsg{Line: line, Offset: currentOffset}:
currentOffset += int64(len(line))
}
if err == io.EOF {
return
}
} else {
return
}
}
}()
return logLines
}
func (a *Analyzer) consumer(ctx context.Context, producer <-chan LineMsg, results chan<- Result) {
defer a.wg.Done()
for {
select {
case <-ctx.Done():
return
case msg, ok := <-producer:
if !ok {
return
}
result := a.Func(msg.Line)
if result != nil {
result.Path = a.Path
result.Offset = msg.Offset
if !a.VerboseResults {
result.Match = ""
}
select {
case results <- *result:
case <-ctx.Done():
}
}
}
}
}
/*
func NoopAnalyzerFunc(line []byte) *Result {
return nil
}
*/