-
Notifications
You must be signed in to change notification settings - Fork 6
/
walker.go
128 lines (107 loc) · 2.62 KB
/
walker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
package walker
import (
"context"
"os"
"path/filepath"
"runtime"
"sync/atomic"
"golang.org/x/sync/errgroup"
)
// Walk wraps WalkWithContext using the background context.
func Walk(root string, walkFn func(pathname string, fi os.FileInfo) error, opts ...Option) error {
return WalkWithContext(context.Background(), root, walkFn, opts...)
}
// WalkWithContext walks the file tree rooted at root, calling walkFn for each
// file or directory in the tree, including root.
//
// If fastWalk returns filepath.SkipDir, the directory is skipped.
//
// Multiple goroutines stat the filesystem concurrently. The provided
// walkFn must be safe for concurrent use.
func WalkWithContext(ctx context.Context, root string, walkFn func(pathname string, fi os.FileInfo) error, opts ...Option) error {
wg, ctx := errgroup.WithContext(ctx)
fi, err := os.Lstat(root)
if err != nil {
return err
}
if err = walkFn(root, fi); err == filepath.SkipDir {
return nil
}
if err != nil || !fi.IsDir() {
return err
}
cpuLimit := runtime.GOMAXPROCS(-1)
if cpuLimit < 4 {
cpuLimit = 4
}
w := walker{
counter: 1,
options: walkerOptions{
limit: cpuLimit,
},
ctx: ctx,
wg: wg,
fn: walkFn,
}
for _, o := range opts {
err := o(&w.options)
if err != nil {
return err
}
}
w.wg.Go(func() error {
return w.gowalk(root)
})
return w.wg.Wait()
}
type walker struct {
counter uint32
ctx context.Context
wg *errgroup.Group
fn func(pathname string, fi os.FileInfo) error
options walkerOptions
}
func (w *walker) walk(dirname string, fi os.FileInfo) error {
pathname := dirname + string(filepath.Separator) + fi.Name()
err := w.fn(pathname, fi)
if err == filepath.SkipDir {
return nil
}
if err != nil {
return err
}
// don't follow symbolic links
if fi.Mode()&os.ModeSymlink != 0 {
return nil
}
if !fi.IsDir() {
return nil
}
if err = w.ctx.Err(); err != nil {
return err
}
current := atomic.LoadUint32(&w.counter)
// if we haven't reached our goroutine limit, spawn a new one
if current < uint32(w.options.limit) {
if atomic.CompareAndSwapUint32(&w.counter, current, current+1) {
w.wg.Go(func() error {
return w.gowalk(pathname)
})
return nil
}
}
// if we've reached our limit, continue with this goroutine
err = w.readdir(pathname)
if err != nil && w.options.errorCallback != nil {
err = w.options.errorCallback(pathname, err)
}
return err
}
func (w *walker) gowalk(pathname string) error {
err := w.readdir(pathname)
if err != nil && w.options.errorCallback != nil {
err = w.options.errorCallback(pathname, err)
}
atomic.AddUint32(&w.counter, ^uint32(0))
return err
}