Skip to content
This repository has been archived by the owner on Aug 26, 2024. It is now read-only.

Commit

Permalink
Track payload sizes of incoming frames
Browse files Browse the repository at this point in the history
  • Loading branch information
martin-sucha committed Sep 21, 2023
1 parent 3975e7d commit a368c8c
Show file tree
Hide file tree
Showing 4 changed files with 117 additions and 27 deletions.
4 changes: 4 additions & 0 deletions cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,10 @@ type ClusterConfig struct {
// Use it to collect metrics / stats from frames by providing an implementation of FrameHeaderObserver.
FrameHeaderObserver FrameHeaderObserver

// FrameObserver will be notified of all received frames that were read.
// FrameObserver will not see frames that were discarded.
FrameObserver FrameObserver

// StreamObserver will be notified of stream state changes.
// This can be used to track in-flight protocol requests and responses.
StreamObserver StreamObserver
Expand Down
65 changes: 44 additions & 21 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,11 +170,12 @@ type Conn struct {
r *bufio.Reader
w contextWriter

timeout time.Duration
writeTimeout time.Duration
cfg *ConnConfig
frameObserver FrameHeaderObserver
streamObserver StreamObserver
timeout time.Duration
writeTimeout time.Duration
cfg *ConnConfig
frameHeaderObserver FrameHeaderObserver
frameObserver FrameObserver
streamObserver StreamObserver

headerBuf [maxFrameHeaderSize]byte

Expand Down Expand Up @@ -279,19 +280,20 @@ func (s *Session) dialWithoutObserver(ctx context.Context, host *HostInfo, cfg *

ctx, cancel := context.WithCancel(ctx)
c := &Conn{
conn: dialedHost.Conn,
r: bufio.NewReader(dialedHost.Conn),
cfg: cfg,
calls: make(map[int]*callReq),
version: uint8(cfg.ProtoVersion),
addr: dialedHost.Conn.RemoteAddr().String(),
errorHandler: errorHandler,
compressor: cfg.Compressor,
session: s,
streams: s.streamIDGenerator(cfg.ProtoVersion),
host: host,
isSchemaV2: true, // Try using "system.peers_v2" until proven otherwise
frameObserver: s.frameObserver,
conn: dialedHost.Conn,
r: bufio.NewReader(dialedHost.Conn),
cfg: cfg,
calls: make(map[int]*callReq),
version: uint8(cfg.ProtoVersion),
addr: dialedHost.Conn.RemoteAddr().String(),
errorHandler: errorHandler,
compressor: cfg.Compressor,
session: s,
streams: s.streamIDGenerator(cfg.ProtoVersion),
host: host,
isSchemaV2: true, // Try using "system.peers_v2" until proven otherwise
frameHeaderObserver: s.frameHeaderObserver,
frameObserver: s.frameObserver,
w: &deadlineContextWriter{
w: dialedHost.Conn,
timeout: writeTimeout,
Expand Down Expand Up @@ -713,8 +715,9 @@ func (c *Conn) recv(ctx context.Context) error {
return err
}

if c.frameObserver != nil {
c.frameObserver.ObserveFrameHeader(context.Background(), ObservedFrameHeader{
var parseObserver frameParseObserver
if c.frameHeaderObserver != nil || c.frameObserver != nil {
observedHeader := ObservedFrameHeader{
Version: protoVersion(head.version),
Flags: head.flags,
Stream: int16(head.stream),
Expand All @@ -723,7 +726,18 @@ func (c *Conn) recv(ctx context.Context) error {
Start: headStartTime,
End: headEndTime,
Host: c.host,
})
}

if c.frameHeaderObserver != nil {
c.frameHeaderObserver.ObserveFrameHeader(context.Background(), observedHeader)
}

if c.frameObserver != nil {
parseObserver = frameParseObserver{
head: observedHeader,
frameObserver: c.frameObserver,
}
}
}

if head.stream > c.streams.NumStreams {
Expand All @@ -734,6 +748,9 @@ func (c *Conn) recv(ctx context.Context) error {
if err := framer.readFrame(c, &head); err != nil {
return err
}
if c.frameObserver != nil {
framer.observer = parseObserver
}
go c.session.handleEvent(framer)
return nil
} else if head.stream <= 0 {
Expand All @@ -743,6 +760,9 @@ func (c *Conn) recv(ctx context.Context) error {
if err := framer.readFrame(c, &head); err != nil {
return err
}
if c.frameObserver != nil {
framer.observer = parseObserver
}

frame, err := framer.parseFrame()
if err != nil {
Expand Down Expand Up @@ -779,6 +799,9 @@ func (c *Conn) recv(ctx context.Context) error {
return err
}
}
if c.frameObserver != nil {
framer.observer = parseObserver
}

// we either, return a response to the caller, the caller timedout, or the
// connection has closed. Either way we should never block indefinatly here
Expand Down
61 changes: 61 additions & 0 deletions frame.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,31 @@ type FrameHeaderObserver interface {
ObserveFrameHeader(context.Context, ObservedFrameHeader)
}

// ObservedFrame describes a frame that was read (not discarded).
type ObservedFrame struct {
ObservedFrameHeader

// UncompressedSize is a size of the frame payload after decompression.
// See ObservedFrameHeader.Length to get the compressed size.
// UncompressedSize is zero if the frame was not compressed.
UncompressedSize int

// RowCount is count of result rows.
// Only set for RESULT frame with Rows kind.
RowCount int

// RowsSize is sum of sizes of all rows in the result.
// Only set for RESULT frame with Rows kind.
RowsSize int
}

// FrameObserver allows observing received frames.
//
// Experimental, this interface and use may change.
type FrameObserver interface {
ObserveFrame(context.Context, ObservedFrame)
}

// a framer is responsible for reading, writing and parsing frames on a single stream
type framer struct {
proto byte
Expand All @@ -353,6 +378,12 @@ type framer struct {
headSize int
// if this frame was read then the header will be here
header *frameHeader
// ucompressedSize is size of the frame payload after decompression.
// It is zero if the frame was not compressed.
// It will be set if this frame was read.
uncompressedSize int

observer frameParseObserver

// if tracing flag is set this is not nil
traceID []byte
Expand All @@ -369,6 +400,26 @@ type framer struct {
rateLimitingErrorCode int
}

type frameParseObserver struct {
head ObservedFrameHeader
frameObserver FrameObserver
}

func (fpo *frameParseObserver) observeFrame(ff *framer, f frame) {
if fpo.frameObserver == nil {
return
}
of := ObservedFrame{
ObservedFrameHeader: fpo.head,
UncompressedSize: ff.uncompressedSize,
}
if rows, ok := f.(resultRowsFrame); ok {
of.RowCount = rows.numRows
of.RowsSize = rows.rowsContentSize
}
fpo.frameObserver.ObserveFrame(context.TODO(), of)
}

func newFramer(compressor Compressor, version byte) *framer {
buf := make([]byte, defaultBufSize)
f := &framer{
Expand Down Expand Up @@ -527,6 +578,8 @@ func (f *framer) readFrame(r io.Reader, head *frameHeader) error {
if err != nil {
return err
}

f.uncompressedSize = len(f.buf)
}

f.header = head
Expand Down Expand Up @@ -581,6 +634,8 @@ func (f *framer) parseFrame() (frame frame, err error) {
return nil, NewErrProtocol("unknown op in frame header: %s", f.header.op)
}

f.observer.observeFrame(f, frame)

return
}

Expand Down Expand Up @@ -1152,6 +1207,11 @@ type resultRowsFrame struct {
meta resultMetadata
// dont parse the rows here as we only need to do it once
numRows int
// rowsContentSize is size of the frame after row_count.
// It approximates the size of rows_content:
// Currently it measures rows_content,
// but theoretically more fields could be added after rows_content in the future.
rowsContentSize int
}

func (f *resultRowsFrame) String() string {
Expand All @@ -1166,6 +1226,7 @@ func (f *framer) parseResultRows() frame {
if result.numRows < 0 {
panic(fmt.Errorf("invalid row_count in result frame: %d", result.numRows))
}
result.rowsContentSize = len(f.buf)

return result
}
Expand Down
14 changes: 8 additions & 6 deletions session.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ type Session struct {
batchObserver BatchObserver
connectObserver ConnectObserver
disconnectObserver DisconnectObserver
frameObserver FrameHeaderObserver
frameHeaderObserver FrameHeaderObserver
frameObserver FrameObserver
streamObserver StreamObserver
hostSource *ringDescriber
ringRefresher *refreshDebouncer
Expand Down Expand Up @@ -175,7 +176,8 @@ func NewSession(cfg ClusterConfig) (*Session, error) {
s.queryObserver = cfg.QueryObserver
s.batchObserver = cfg.BatchObserver
s.connectObserver = cfg.ConnectObserver
s.frameObserver = cfg.FrameHeaderObserver
s.frameHeaderObserver = cfg.FrameHeaderObserver
s.frameObserver = cfg.FrameObserver
s.streamObserver = cfg.StreamObserver

//Check the TLS Config before trying to connect to anything external
Expand Down Expand Up @@ -2145,10 +2147,10 @@ type routingKeyInfoLRU struct {
}

type routingKeyInfo struct {
indexes []int
types []TypeInfo
keyspace string
table string
indexes []int
types []TypeInfo
keyspace string
table string
lwt bool
partitioner partitioner
}
Expand Down

0 comments on commit a368c8c

Please sign in to comment.