-
Notifications
You must be signed in to change notification settings - Fork 61
/
frame.go
124 lines (100 loc) · 3.15 KB
/
frame.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
package qrpc
import (
"context"
)
// Frame models a qrpc frame
// all fields are readly only
type Frame struct {
RequestID uint64
Flags FrameFlag
Cmd Cmd
Payload []byte
Stream *Stream // non nil for the first frame in stream
stash interface{} // store per frame stuff, mainly used by middleware
}
// FrameCh get the next frame ch
func (r *Frame) FrameCh() <-chan *Frame {
return r.Stream.frameCh
}
// Context returns the request's context.
//
// The returned context is always non-nil;
//
// For outgoing client requests, the context controls cancelation.
//
// For incoming server requests, the context is canceled when the
// client's connection closes, the request is canceled ,
// or when the ServeQRPC method returns. (TODO)
func (r *Frame) Context() context.Context {
return r.Stream.ctx
}
// FromClient returns true if frame is from clientconn
func (r *Frame) FromClient() bool {
// RequestID odd means come from client
return r.RequestID%2 == 1
}
// FromServer returns true if frame is from serveconn
func (r *Frame) FromServer() bool {
// RequestID even means com from server
return r.RequestID%2 == 0
}
// RequestFrame is client->server
type RequestFrame Frame
// ConnectionInfo returns the underlying ConnectionInfo
func (r *RequestFrame) ConnectionInfo() *ConnectionInfo {
return r.Stream.ctx.Value(ConnectionInfoKey).(*ConnectionInfo)
}
// ClientConnectionInfo returns the underlying ClientConnectionInfo
func (r *RequestFrame) ClientConnectionInfo() *ClientConnectionInfo {
return r.Stream.ctx.Value(ClientConnectionInfoKey).(*ClientConnectionInfo)
}
// GetStash fetches frame stash
func (r *RequestFrame) GetStash() interface{} {
return r.stash
}
// SetStash to set frame stash
func (r *RequestFrame) SetStash(stash interface{}) {
r.stash = stash
}
// Close the underlying connection
func (r *RequestFrame) Close() error {
if r.FromClient() {
ci := r.Stream.ctx.Value(ConnectionInfoKey).(*ConnectionInfo)
return ci.serveconn.Close()
}
cci, ok := r.Stream.ctx.Value(ClientConnectionInfoKey).(*ClientConnectionInfo)
// this is for compatibility with old qrpc client
if !ok {
ci := r.Stream.ctx.Value(ConnectionInfoKey).(*ConnectionInfo)
return ci.serveconn.Close()
}
cci.CC.closeRWC()
return nil
}
// FromClient returns true if frame is from clientconn
func (r *RequestFrame) FromClient() bool {
return (*Frame)(r).FromClient()
}
// Context for RequestFrame
func (r *RequestFrame) Context() context.Context {
return (*Frame)(r).Context()
}
// FrameCh for RequestFrame
func (r *RequestFrame) FrameCh() <-chan *Frame {
return (*Frame)(r).FrameCh()
}
// StreamInitiator for stream initiating side, may also be from server side
// implemented by both Connection and serverconn
type StreamInitiator interface {
StreamRequest(cmd Cmd, flags FrameFlag, payload []byte) (StreamWriter, Response, error)
IsClosed() bool
}
// StreamInitiator returns the underlying StreamInitiator
func (r *RequestFrame) StreamInitiator() StreamInitiator {
ci, ok := r.Stream.ctx.Value(ConnectionInfoKey).(*ConnectionInfo)
if ok {
return ci.serveconn
}
cci := r.Stream.ctx.Value(ClientConnectionInfoKey).(*ClientConnectionInfo)
return cci.CC
}