-
Notifications
You must be signed in to change notification settings - Fork 3
/
source.go
137 lines (119 loc) · 2.99 KB
/
source.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
package fp
import (
"bufio"
"io"
"reflect"
)
type Source interface {
// ElemType element type
ElemType() reflect.Type
// Next element
Next() (reflect.Value, bool)
}
type KVSource interface {
ElemType() (reflect.Type, reflect.Type)
Next() (reflect.Value, reflect.Value, bool)
}
func makeIter(ctx context, val reflect.Value) (reflect.Type, iterator) {
typ := val.Type()
if source, ok := val.Interface().(Source); ok && source != nil {
return source.ElemType(), source.Next
}
if isIterFunction(val) {
return val.Type().Out(0), func() (reflect.Value, bool) {
out := val.Call(nil)
return out[0], out[1].Bool()
}
} else if isIterFunction2(val) {
return val.Type().Out(0), func() (reflect.Value, bool) {
out := val.Call(nil)
if _err := out[2].Interface(); _err != nil && _err.(error) != nil {
ctx.SetErr(_err.(error))
return reflect.Value{}, false
}
return out[0], out[1].Bool()
}
}
switch typ.Kind() {
case reflect.Slice, reflect.Array:
source := newSliceSource(typ.Elem(), val)
return source.ElemType(), source.Next
case reflect.Chan:
source := newChannelSource(typ.Elem(), val)
return source.ElemType(), source.Next
}
panic("not support " + typ.String())
}
func isIterFunction(fn reflect.Value) bool {
typ := fn.Type()
return typ.Kind() == reflect.Func && typ.NumIn() == 0 && typ.NumOut() == 2 && typ.Out(1) == boolType
}
func isIterFunction2(fn reflect.Value) bool {
typ := fn.Type()
return typ.Kind() == reflect.Func && typ.NumIn() == 0 && typ.NumOut() == 3 && typ.Out(1) == boolType && typ.Out(2) == errType
}
/* slice stream */
type sliceSource struct {
elemType reflect.Type
arr reflect.Value
offset int
}
func newSliceSource(elemTyp reflect.Type, arr reflect.Value) Source {
return &sliceSource{
elemType: elemTyp,
arr: arr,
}
}
func (ss *sliceSource) ElemType() reflect.Type { return ss.elemType }
func (ss *sliceSource) Next() (reflect.Value, bool) {
if ss.offset >= ss.arr.Len() {
return reflect.Value{}, false
}
offset := ss.offset
ss.offset++
return ss.arr.Index(offset), true
}
/* channel stream */
type channelSource struct {
elemType reflect.Type
ch reflect.Value
}
func newChannelSource(elemTyp reflect.Type, ch reflect.Value) Source {
return &channelSource{
elemType: elemTyp,
ch: ch,
}
}
func (cs *channelSource) ElemType() reflect.Type {
return cs.elemType
}
func (cs *channelSource) Next() (reflect.Value, bool) {
if _, recv, ok := reflect.Select([]reflect.SelectCase{
{
Dir: reflect.SelectRecv,
Chan: cs.ch,
},
}); ok {
return recv, true
}
return reflect.Value{}, false
}
type lineSource struct {
s *bufio.Scanner
elemTyp reflect.Type
}
func NewLineSource(r io.Reader) Source {
return &lineSource{
s: bufio.NewScanner(r),
elemTyp: reflect.TypeOf(""),
}
}
func (ls *lineSource) ElemType() reflect.Type {
return ls.elemTyp
}
func (ls *lineSource) Next() (reflect.Value, bool) {
if ls.s.Scan() {
return reflect.ValueOf(ls.s.Text()), true
}
return reflect.Value{}, false
}