Skip to content

Commit

Permalink
Support analyze multiple protocol in the same connection (#160)
Browse files Browse the repository at this point in the history
  • Loading branch information
mrproliu authored Nov 28, 2024
1 parent 17f479e commit e73c782
Show file tree
Hide file tree
Showing 9 changed files with 89 additions and 50 deletions.
12 changes: 6 additions & 6 deletions pkg/accesslog/collector/protocols/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ import (

type PartitionConnection struct {
connectionID, randomID uint64
dataBuffer *buffer.Buffer
protocol map[enums.ConnectionProtocol]bool
dataBuffers map[enums.ConnectionProtocol]*buffer.Buffer
protocol map[enums.ConnectionProtocol]uint64 // protocol with minimal data id
protocolAnalyzer map[enums.ConnectionProtocol]Protocol
protocolMetrics map[enums.ConnectionProtocol]ProtocolMetrics
closed bool
Expand All @@ -48,8 +48,8 @@ func (p *PartitionConnection) IsExistProtocol(protocol enums.ConnectionProtocol)
return exist
}

func (p *PartitionConnection) Buffer() *buffer.Buffer {
return p.dataBuffer
func (p *PartitionConnection) Buffer(protocol enums.ConnectionProtocol) *buffer.Buffer {
return p.dataBuffers[protocol]
}

func (p *PartitionConnection) AppendDetail(ctx *common.AccessLogContext, detail events.SocketDetail) {
Expand All @@ -58,12 +58,12 @@ func (p *PartitionConnection) AppendDetail(ctx *common.AccessLogContext, detail
forwarder.SendTransferNoProtocolEvent(ctx, detail)
return
}
p.dataBuffer.AppendDetailEvent(detail)
p.dataBuffers[detail.GetProtocol()].AppendDetailEvent(detail)
}

func (p *PartitionConnection) AppendData(data buffer.SocketDataBuffer) {
if p.skipAllDataAnalyze {
return
}
p.dataBuffer.AppendDataEvent(data)
p.dataBuffers[data.Protocol()].AppendDataEvent(data)
}
19 changes: 10 additions & 9 deletions pkg/accesslog/collector/protocols/http1.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,18 +68,19 @@ func (p *HTTP1Protocol) GenerateConnection(connectionID, randomID uint64) Protoc

func (p *HTTP1Protocol) Analyze(connection *PartitionConnection, _ *AnalyzeHelper) error {
metrics := connection.Metrics(enums.ConnectionProtocolHTTP).(*HTTP1Metrics)
buf := connection.Buffer(enums.ConnectionProtocolHTTP)
http1Log.Debugf("ready to analyze HTTP/1 protocol data, connection ID: %d, random ID: %d, data len: %d",
metrics.ConnectionID, metrics.RandomID, connection.Buffer().DataLength())
connection.Buffer().ResetForLoopReading()
metrics.ConnectionID, metrics.RandomID, buf.DataLength())
buf.ResetForLoopReading()
for {
if !connection.Buffer().PrepareForReading() {
if !buf.PrepareForReading() {
return nil
}

messageType, err := reader.IdentityMessageType(connection.Buffer())
messageType, err := reader.IdentityMessageType(buf)
if err != nil {
http1Log.Debugf("failed to identity message type, %v", err)
if connection.Buffer().SkipCurrentElement() {
if buf.SkipCurrentElement() {
break
}
continue
Expand All @@ -88,19 +89,19 @@ func (p *HTTP1Protocol) Analyze(connection *PartitionConnection, _ *AnalyzeHelpe
var result enums.ParseResult
switch messageType {
case reader.MessageTypeRequest:
result, _ = p.handleRequest(metrics, connection.Buffer())
result, _ = p.handleRequest(metrics, buf)
case reader.MessageTypeResponse:
result, _ = p.handleResponse(metrics, connection.Buffer())
result, _ = p.handleResponse(metrics, buf)
case reader.MessageTypeUnknown:
result = enums.ParseResultSkipPackage
}

finishReading := false
switch result {
case enums.ParseResultSuccess:
finishReading = connection.Buffer().RemoveReadElements()
finishReading = buf.RemoveReadElements()
case enums.ParseResultSkipPackage:
finishReading = connection.Buffer().SkipCurrentElement()
finishReading = buf.SkipCurrentElement()
}

if finishReading {
Expand Down
21 changes: 11 additions & 10 deletions pkg/accesslog/collector/protocols/http2.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,19 +89,20 @@ func (r *HTTP2Protocol) GenerateConnection(connectionID, randomID uint64) Protoc

func (r *HTTP2Protocol) Analyze(connection *PartitionConnection, helper *AnalyzeHelper) error {
http2Metrics := connection.Metrics(enums.ConnectionProtocolHTTP2).(*HTTP2Metrics)
buf := connection.Buffer(enums.ConnectionProtocolHTTP2)
http2Log.Debugf("ready to analyze HTTP/2 protocol data, connection ID: %d, random ID: %d",
http2Metrics.connectionID, http2Metrics.randomID)
connection.Buffer().ResetForLoopReading()
buf.ResetForLoopReading()
for {
if !connection.Buffer().PrepareForReading() {
if !buf.PrepareForReading() {
return nil
}

startPosition := connection.Buffer().Position()
header, err := http2.ReadFrameHeader(connection.Buffer())
startPosition := buf.Position()
header, err := http2.ReadFrameHeader(buf)
if err != nil {
http2Log.Debugf("failed to read frame header, %v", err)
if connection.Buffer().SkipCurrentElement() {
if buf.SkipCurrentElement() {
break
}
continue
Expand All @@ -112,12 +113,12 @@ func (r *HTTP2Protocol) Analyze(connection *PartitionConnection, helper *Analyze
var result enums.ParseResult
switch header.Type {
case http2.FrameHeaders:
result, protocolBreak, _ = r.handleHeader(&header, startPosition, http2Metrics, connection.Buffer())
result, protocolBreak, _ = r.handleHeader(&header, startPosition, http2Metrics, buf)
case http2.FrameData:
result, protocolBreak, _ = r.handleData(&header, startPosition, http2Metrics, connection.Buffer())
result, protocolBreak, _ = r.handleData(&header, startPosition, http2Metrics, buf)
default:
tmp := make([]byte, header.Length)
if err := connection.Buffer().ReadUntilBufferFull(tmp); err != nil {
if err := buf.ReadUntilBufferFull(tmp); err != nil {
if errors.Is(err, buffer.ErrNotComplete) {
result = enums.ParseResultSkipPackage
} else {
Expand All @@ -139,9 +140,9 @@ func (r *HTTP2Protocol) Analyze(connection *PartitionConnection, helper *Analyze
finishReading := false
switch result {
case enums.ParseResultSuccess:
finishReading = connection.Buffer().RemoveReadElements()
finishReading = buf.RemoveReadElements()
case enums.ParseResultSkipPackage:
finishReading = connection.Buffer().SkipCurrentElement()
finishReading = buf.SkipCurrentElement()
}

if finishReading {
Expand Down
63 changes: 43 additions & 20 deletions pkg/accesslog/collector/protocols/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"errors"
"fmt"
"os"
"sort"
"sync"
"time"

Expand Down Expand Up @@ -119,25 +120,30 @@ type PartitionContext struct {
analyzeLocker sync.Mutex
}

func newPartitionConnection(protocolMgr *ProtocolManager, conID, randomID uint64, protocol enums.ConnectionProtocol) *PartitionConnection {
func newPartitionConnection(protocolMgr *ProtocolManager, conID, randomID uint64,
protocol enums.ConnectionProtocol, currentDataID uint64) *PartitionConnection {
connection := &PartitionConnection{
connectionID: conID,
randomID: randomID,
dataBuffer: buffer.NewBuffer(),
protocol: make(map[enums.ConnectionProtocol]bool),
dataBuffers: make(map[enums.ConnectionProtocol]*buffer.Buffer),
protocol: make(map[enums.ConnectionProtocol]uint64),
protocolAnalyzer: make(map[enums.ConnectionProtocol]Protocol),
protocolMetrics: make(map[enums.ConnectionProtocol]ProtocolMetrics),
}
connection.appendProtocolIfNeed(protocolMgr, conID, randomID, protocol)
connection.appendProtocolIfNeed(protocolMgr, conID, randomID, protocol, currentDataID)
return connection
}

func (p *PartitionConnection) appendProtocolIfNeed(protocolMgr *ProtocolManager, conID, randomID uint64, protocol enums.ConnectionProtocol) {
if _, exist := p.protocol[protocol]; !exist {
func (p *PartitionConnection) appendProtocolIfNeed(protocolMgr *ProtocolManager, conID, randomID uint64,
protocol enums.ConnectionProtocol, currentDataID uint64) {
if minDataID, exist := p.protocol[protocol]; !exist {
analyzer := protocolMgr.GetProtocol(protocol)
p.protocol[protocol] = true
p.protocol[protocol] = currentDataID
p.dataBuffers[protocol] = buffer.NewBuffer()
p.protocolAnalyzer[protocol] = analyzer
p.protocolMetrics[protocol] = analyzer.GenerateConnection(conID, randomID)
} else if currentDataID < minDataID {
p.protocol[protocol] = currentDataID
}
}

Expand Down Expand Up @@ -212,26 +218,27 @@ func (p *PartitionContext) Consume(data interface{}) {
forwarder.SendTransferNoProtocolEvent(p.context, event)
return
}
connection := p.getConnectionContext(event.GetConnectionID(), event.GetRandomID(), event.GetProtocol())
connection := p.getConnectionContext(event.GetConnectionID(), event.GetRandomID(), event.GetProtocol(), event.DataID())
connection.AppendDetail(p.context, event)
case *events.SocketDataUploadEvent:
pid, _ := events.ParseConnectionID(event.ConnectionID)
log.Debugf("receive the socket data event, connection ID: %d, random ID: %d, pid: %d, data id: %d, sequence: %d, protocol: %d",
event.ConnectionID, event.RandomID, pid, event.DataID0, event.Sequence0, event.Protocol)
connection := p.getConnectionContext(event.ConnectionID, event.RandomID, event.Protocol)
event.ConnectionID, event.RandomID, pid, event.DataID0, event.Sequence0, event.Protocol0)
connection := p.getConnectionContext(event.ConnectionID, event.RandomID, event.Protocol0, event.DataID0)
connection.AppendData(event)
}
}

func (p *PartitionContext) getConnectionContext(connectionID, randomID uint64, protocol enums.ConnectionProtocol) *PartitionConnection {
func (p *PartitionContext) getConnectionContext(connectionID, randomID uint64,
protocol enums.ConnectionProtocol, currentDataID uint64) *PartitionConnection {
conKey := p.buildConnectionKey(connectionID, randomID)
conn, exist := p.connections.Get(conKey)
if exist {
connection := conn.(*PartitionConnection)
connection.appendProtocolIfNeed(p.protocolMgr, connectionID, randomID, protocol)
connection.appendProtocolIfNeed(p.protocolMgr, connectionID, randomID, protocol, currentDataID)
return connection
}
result := newPartitionConnection(p.protocolMgr, connectionID, randomID, protocol)
result := newPartitionConnection(p.protocolMgr, connectionID, randomID, protocol, currentDataID)
p.connections.Set(conKey, result)
return result
}
Expand All @@ -254,7 +261,10 @@ func (p *PartitionContext) processEvents() {
p.processConnectionEvents(info)

// if the connection already closed and not contains any buffer data, then delete the connection
bufLen := info.dataBuffer.DataLength()
var bufLen = 0
for _, buf := range info.dataBuffers {
bufLen += buf.DataLength()
}
if bufLen > 0 {
return
}
Expand Down Expand Up @@ -309,9 +319,11 @@ func (p *PartitionContext) processExpireEvents() {
}

func (p *PartitionContext) processConnectionExpireEvents(connection *PartitionConnection) {
if c := connection.dataBuffer.DeleteExpireEvents(maxBufferExpireDuration); c > 0 {
log.Debugf("total removed %d expired socket data events from connection ID: %d, random ID: %d", c,
connection.connectionID, connection.randomID)
for _, buf := range connection.dataBuffers {
if c := buf.DeleteExpireEvents(maxBufferExpireDuration); c > 0 {
log.Debugf("total removed %d expired socket data events from connection ID: %d, random ID: %d", c,
connection.connectionID, connection.randomID)
}
}
}

Expand All @@ -320,8 +332,17 @@ func (p *PartitionContext) processConnectionEvents(connection *PartitionConnecti
return
}
helper := &AnalyzeHelper{}
for protocol, analyzer := range connection.protocolAnalyzer {
if err := analyzer.Analyze(connection, helper); err != nil {

// since the socket data/detail are getting unsorted, so rover need to using the minimal data id to analyze to ensure the order
sortedProtocols := make([]enums.ConnectionProtocol, 0, len(connection.protocol))
for protocol := range connection.protocol {
sortedProtocols = append(sortedProtocols, protocol)
}
sort.Slice(sortedProtocols, func(i, j int) bool {
return connection.protocol[sortedProtocols[i]] < connection.protocol[sortedProtocols[j]]
})
for _, protocol := range sortedProtocols {
if err := connection.protocolAnalyzer[protocol].Analyze(connection, helper); err != nil {
log.Warnf("failed to analyze the %s protocol data: %v", enums.ConnectionProtocolString(protocol), err)
}
}
Expand All @@ -330,6 +351,8 @@ func (p *PartitionContext) processConnectionEvents(connection *PartitionConnecti
// notify the connection manager to skip analyze all data(just sending the detail)
connection.skipAllDataAnalyze = true
p.context.ConnectionMgr.SkipAllDataAnalyze(connection.connectionID, connection.randomID)
connection.dataBuffer.Clean()
for _, buf := range connection.dataBuffers {
buf.Clean()
}
}
}
6 changes: 5 additions & 1 deletion pkg/accesslog/events/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
)

type SocketDataUploadEvent struct {
Protocol enums.ConnectionProtocol
Protocol0 enums.ConnectionProtocol
HaveReduce uint8
Direction0 enums.SocketDataDirection
Finished uint8
Expand All @@ -39,6 +39,10 @@ type SocketDataUploadEvent struct {
Buffer [2048]byte
}

func (s *SocketDataUploadEvent) Protocol() enums.ConnectionProtocol {
return s.Protocol0
}

func (s *SocketDataUploadEvent) GenerateConnectionID() string {
return fmt.Sprintf("%d_%d", s.ConnectionID, s.RandomID)
}
Expand Down
6 changes: 5 additions & 1 deletion pkg/profiling/task/network/analyze/events/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
)

type SocketDataUploadEvent struct {
Protocol enums.ConnectionProtocol
Protocol0 enums.ConnectionProtocol
HaveReduce uint8
Direction0 enums.SocketDataDirection
Finished uint8
Expand All @@ -39,6 +39,10 @@ type SocketDataUploadEvent struct {
Buffer [2048]byte
}

func (s *SocketDataUploadEvent) Protocol() enums.ConnectionProtocol {
return s.Protocol0
}

func (s *SocketDataUploadEvent) GenerateConnectionID() string {
return fmt.Sprintf("%d_%d", s.ConnectionID, s.RandomID)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,10 @@ func (a *Analyzer) Start(ctx context.Context) {
}

func (a *Analyzer) ReceiveSocketDataEvent(event *events.SocketDataUploadEvent) {
analyzer := a.protocols[event.Protocol]
analyzer := a.protocols[event.Protocol()]
if analyzer == nil {
log.Warnf("could not found any protocol to handle socket data, connection id: %s, protocol: %s(%d)",
event.GenerateConnectionID(), enums.ConnectionProtocolString(event.Protocol), event.Protocol)
event.GenerateConnectionID(), enums.ConnectionProtocolString(event.Protocol()), event.Protocol())
return
}
analyzer.ReceiveSocketData(a.ctx, event)
Expand Down
6 changes: 6 additions & 0 deletions pkg/tools/buffer/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ var (
)

type SocketDataBuffer interface {
// Protocol of the buffer
Protocol() enums.ConnectionProtocol
// GenerateConnectionID for identity the buffer belong which connection
GenerateConnectionID() string
// BufferData of the buffer
Expand Down Expand Up @@ -88,6 +90,10 @@ type SocketDataEventLimited struct {
Size int
}

func (s *SocketDataEventLimited) Protocol() enums.ConnectionProtocol {
return s.SocketDataBuffer.Protocol()
}

func (s *SocketDataEventLimited) BufferData() []byte {
return s.SocketDataBuffer.BufferData()[s.From:s.Size]
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/tools/ssl/gotls.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ func (r *Register) generateGOTLSSymbolOffsets(register *Register, elfFile *elf.F

sym := register.SearchSymbol(func(a, b string) bool {
return a == b
}, "go.itab.*net.TCPConn,net.Conn")
}, "go.itab.*net.TCPConn,net.Conn", "go:itab.*net.TCPConn,net.Conn")
if sym == nil {
log.Warnf("could not found the tcp connection symbol: go.itab.*net.TCPConn,net.Conn")
return nil, nil
Expand Down

0 comments on commit e73c782

Please sign in to comment.