-
Notifications
You must be signed in to change notification settings - Fork 0
/
mux.go
73 lines (63 loc) · 1.86 KB
/
mux.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
package mxwriter
import (
"bytes"
"errors"
"io"
"strconv"
)
// Separator is the key used to separate when writing
// the length of the key from the content
var Separator []byte = []byte(";")
// mux is the internal implementation of the multiplexer
type mux struct {
buffers map[string]*bytes.Buffer
keys []string
}
// NewMux returns a Multiplexer io.ReadWriter which
// can be used to write different streams of data
// and later on read them separately by key.
// To read from an specific key use the NewDemux
// function to get the Demux
// NOTE: Not safe for concurrent use
func NewMux() io.ReadWriter {
return &mux{
buffers: make(map[string]*bytes.Buffer),
}
}
// Write writes the content of p to the internal buffer.
// The format has to be `<length-key>;<key><content>`
func (w *mux) Write(p []byte) (int, error) {
lenidx := bytes.Index(p, Separator)
if lenidx == -1 {
return 0, errors.New("invalid write format")
}
l, err := strconv.Atoi(string(p[:lenidx]))
if err != nil {
return 0, err
}
key := string(p[lenidx+1 : lenidx+l+1])
p = p[lenidx+l+1:]
if buff, ok := w.buffers[key]; ok {
n, err := io.Copy(buff, bytes.NewReader(p))
return int(n), err
}
w.buffers[key] = bytes.NewBuffer(p)
w.keys = append(w.keys, key)
return len(p), nil
}
// Read will basically read everything written, the
// order of content is based on the order of the Keys
// written using w.Write
func (w *mux) Read(p []byte) (int, error) {
readers := make([]io.Reader, 0)
for _, k := range w.keys {
readers = append(readers, w.buffers[k])
}
return io.MultiReader(readers...).Read(p)
}
// Write is a helper that automatically writes with the expected format to the w
func Write(w io.Writer, key string, content []byte) (int, error) {
l := len(key)
content = append(bytes.Join([][]byte{[]byte(strconv.Itoa(l)), Separator, []byte(key)}, nil), content...)
return w.Write(content)
}