-
Notifications
You must be signed in to change notification settings - Fork 8
/
indexer.go
140 lines (126 loc) · 3.33 KB
/
indexer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
package webtail
// This file holds directory tree indexer methods
import (
"os"
"path/filepath"
"sort"
"strings"
"sync"
"time"
"github.com/dc0d/dirwatch"
"github.com/go-logr/logr"
)
// IndexItemAttr holds File (index item) Attrs
type IndexItemAttr struct {
ModTime time.Time `json:"mtime"`
Size int64 `json:"size"`
}
// IndexItemAttrStore holds all index items
type IndexItemAttrStore map[string]*IndexItemAttr
type indexWorker struct {
out chan *IndexItemEvent
quit chan struct{}
log logr.Logger
root string
}
// IndexerRun runs indexer
func (ts *TailService) IndexerRun(out chan *IndexItemEvent, wg *sync.WaitGroup) {
quit := make(chan struct{})
ts.workers[""] = &TailAttr{Quit: quit}
readyChan := make(chan struct{})
go indexWorker{
out: out,
quit: quit,
log: ts.log,
root: ts.Config.Root,
}.run(readyChan, wg)
<-readyChan
err := loadIndex(ts.index, ts.Config.Root, time.Now())
if err != nil {
ts.log.Error(err, "Path walk")
}
ts.log.V(1).Info("Indexer started")
}
// IndexKeys returns sorted index keys
func (ts *TailService) IndexKeys() []string {
items := ts.index
// To store the keys in slice in sorted order
keys := make([]string, len(items))
i := 0
for k := range items {
keys[i] = k
i++
}
sort.Strings(keys)
return keys
}
// IndexItem returns index item
func (ts *TailService) IndexItem(key string) *IndexItemAttr {
return ts.index[key]
}
// IndexUpdate updates TailService index item
func (ts *TailService) IndexUpdate(msg *IndexItemEvent) {
if !msg.Deleted {
ts.index[msg.Name] = &IndexItemAttr{ModTime: msg.ModTime, Size: msg.Size}
return
}
if _, ok := ts.index[msg.Name]; ok {
ts.log.Info("Deleting path from index", "path", msg.Name)
items := ts.index
for k := range items {
if strings.HasPrefix(k, msg.Name) {
delete(ts.index, k)
}
}
}
}
// run runs indexer worker
func (iw indexWorker) run(readyChan chan struct{}, wg *sync.WaitGroup) {
wg.Add(1)
defer func() {
wg.Done()
iw.log.V(1).Info("Indexer stopped")
}()
notify := func(ev dirwatch.Event) {
iw.log.Info("Handling file event", "event", ev)
if err := sendUpdate(iw.out, iw.root, ev.Name); err != nil {
iw.log.Error(err, "Cannot get stat for file", "filepath", ev.Name)
}
}
logger := func(args ...interface{}) {} // Is it called ever?
watcher := dirwatch.New(dirwatch.Notify(notify), dirwatch.Logger(logger))
defer watcher.Stop()
watcher.Add(iw.root, true)
readyChan <- struct{}{}
<-iw.quit
}
// sendUpdate sends index update to out channel
func sendUpdate(out chan *IndexItemEvent, root, filePath string) error {
dir := strings.TrimSuffix(root, "/")
p := strings.TrimPrefix(filePath, dir+"/")
f, err := os.Stat(filePath)
if err != nil {
if os.IsNotExist(err) {
out <- &IndexItemEvent{Name: p, Deleted: true}
} else {
return err
}
} else if !f.IsDir() {
out <- &IndexItemEvent{Name: p, ModTime: f.ModTime(), Size: f.Size()}
}
return nil
}
// loadIndex loads index items for the first time
func loadIndex(index IndexItemAttrStore, root string, lastmod time.Time) error {
dir := strings.TrimSuffix(root, "/")
err := filepath.Walk(root, func(path string, f os.FileInfo, err error) error {
if !f.IsDir() {
if f.ModTime().Before(lastmod) {
p := strings.TrimPrefix(path, dir+"/")
index[p] = &IndexItemAttr{ModTime: f.ModTime(), Size: f.Size()}
}
}
return nil
})
return err
}