-
Notifications
You must be signed in to change notification settings - Fork 2
/
processor_test.go
56 lines (45 loc) · 1.11 KB
/
processor_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
package kawa_test
import (
"context"
"fmt"
"testing"
"github.com/runreveal/kawa"
"github.com/runreveal/kawa/x/memory"
)
type BinString string
func (bs *BinString) MarshalBinary() ([]byte, error) {
return []byte(*bs), nil
}
func (bs *BinString) UnmarshalBinary(bts []byte) error {
*bs = BinString(bts[:])
return nil
}
func TestProcessor(t *testing.T) {
inC, outC := make(chan *BinString), make(chan *BinString)
memSrc := memory.MemorySource[*BinString]{
MsgC: inC,
}
memDst := memory.MemoryDestination[*BinString]{
MsgC: outC,
}
countMessages := kawa.HandlerFunc[*BinString, *BinString](
func(c context.Context, m kawa.Message[*BinString]) ([]kawa.Message[*BinString], error) {
return []kawa.Message[*BinString]{m}, nil
})
p, _ := kawa.New[*BinString, *BinString](kawa.Config[*BinString, *BinString]{
Source: memSrc,
Destination: memDst,
Handler: (countMessages),
})
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go func() {
err := p.Run(ctx)
fmt.Println(err)
}()
for i := 0; i < 10; i++ {
bs := BinString("hi")
inC <- &bs
fmt.Println(<-outC)
}
}