Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor the protocol analyze logical #156

Merged
merged 2 commits into from
Nov 19, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 69 additions & 0 deletions pkg/accesslog/collector/protocols/connection.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package protocols

import (
"time"

"github.com/apache/skywalking-rover/pkg/accesslog/common"
"github.com/apache/skywalking-rover/pkg/accesslog/events"
"github.com/apache/skywalking-rover/pkg/accesslog/forwarder"
"github.com/apache/skywalking-rover/pkg/tools/buffer"
"github.com/apache/skywalking-rover/pkg/tools/enums"
)

type PartitionConnection struct {
connectionID, randomID uint64
dataBuffer *buffer.Buffer
protocol map[enums.ConnectionProtocol]bool
protocolAnalyzer map[enums.ConnectionProtocol]Protocol
protocolMetrics map[enums.ConnectionProtocol]ProtocolMetrics
closed bool
closeCallback common.ConnectionProcessFinishCallback
skipAllDataAnalyze bool
lastCheckCloseTime time.Time
}

func (p *PartitionConnection) Metrics(protocol enums.ConnectionProtocol) ProtocolMetrics {
return p.protocolMetrics[protocol]
}

func (p *PartitionConnection) IsExistProtocol(protocol enums.ConnectionProtocol) bool {
_, exist := p.protocol[protocol]
return exist
}

func (p *PartitionConnection) Buffer() *buffer.Buffer {
return p.dataBuffer
}

func (p *PartitionConnection) AppendDetail(ctx *common.AccessLogContext, detail events.SocketDetail) {
if p.skipAllDataAnalyze {
// if the connection is already skip all data analyze, then just send the detail event
forwarder.SendTransferNoProtocolEvent(ctx, detail)
return
}
p.dataBuffer.AppendDetailEvent(detail)
}

func (p *PartitionConnection) AppendData(data buffer.SocketDataBuffer) {
if p.skipAllDataAnalyze {
return
}
p.dataBuffer.AppendDataEvent(data)
}
46 changes: 46 additions & 0 deletions pkg/accesslog/collector/protocols/http.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package protocols

import v3 "skywalking.apache.org/repo/goapi/collect/ebpf/accesslog/v3"

// TransformHTTPMethod transforms the http method to the v3.AccessLogHTTPProtocolRequestMethod
func TransformHTTPMethod(method string) v3.AccessLogHTTPProtocolRequestMethod {
switch method {
case "GET":
return v3.AccessLogHTTPProtocolRequestMethod_Get
case "POST":
return v3.AccessLogHTTPProtocolRequestMethod_Post
case "PUT":
return v3.AccessLogHTTPProtocolRequestMethod_Put
case "DELETE":
return v3.AccessLogHTTPProtocolRequestMethod_Delete
case "HEAD":
return v3.AccessLogHTTPProtocolRequestMethod_Head
case "OPTIONS":
return v3.AccessLogHTTPProtocolRequestMethod_Options
case "TRACE":
return v3.AccessLogHTTPProtocolRequestMethod_Trace
case "CONNECT":
return v3.AccessLogHTTPProtocolRequestMethod_Connect
case "PATCH":
return v3.AccessLogHTTPProtocolRequestMethod_Patch
}
http1Log.Warnf("unknown http method: %s", method)
return v3.AccessLogHTTPProtocolRequestMethod_Get
}
118 changes: 51 additions & 67 deletions pkg/accesslog/collector/protocols/http1.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,45 +34,52 @@ import (

var http1Log = logger.GetLogger("accesslog", "collector", "protocols", "http1")

func init() {
registeredProtocols[enums.ConnectionProtocolHTTP] = func(ctx *common.AccessLogContext) Protocol {
return &HTTP1Protocol{ctx: ctx}
}
}
type HTTP1ProtocolAnalyze func(metrics *HTTP1Metrics, request *reader.Request, response *reader.Response)

type HTTP1Protocol struct {
ctx *common.AccessLogContext
ctx *common.AccessLogContext
analyze HTTP1ProtocolAnalyze
}

func NewHTTP1Analyzer(ctx *common.AccessLogContext, analyze HTTP1ProtocolAnalyze) *HTTP1Protocol {
protocol := &HTTP1Protocol{ctx: ctx}
if analyze == nil {
protocol.analyze = protocol.HandleHTTPData
} else {
protocol.analyze = analyze
}
return protocol
}

type HTTP1Metrics struct {
connectionID uint64
randomID uint64
ConnectionID uint64
RandomID uint64

halfRequests *list.List
}

func (p *HTTP1Protocol) GenerateConnection(connectionID, randomID uint64) ProtocolMetrics {
return &HTTP1Metrics{
connectionID: connectionID,
randomID: randomID,
ConnectionID: connectionID,
RandomID: randomID,
halfRequests: list.New(),
}
}

func (p *HTTP1Protocol) Analyze(metrics ProtocolMetrics, buf *buffer.Buffer, _ *AnalyzeHelper) error {
http1Metrics := metrics.(*HTTP1Metrics)
func (p *HTTP1Protocol) Analyze(connection *PartitionConnection, _ *AnalyzeHelper) error {
metrics := connection.Metrics(enums.ConnectionProtocolHTTP).(*HTTP1Metrics)
http1Log.Debugf("ready to analyze HTTP/1 protocol data, connection ID: %d, random ID: %d, data len: %d",
http1Metrics.connectionID, http1Metrics.randomID, buf.DataLength())
buf.ResetForLoopReading()
metrics.ConnectionID, metrics.RandomID, connection.Buffer().DataLength())
connection.Buffer().ResetForLoopReading()
for {
if !buf.PrepareForReading() {
if !connection.Buffer().PrepareForReading() {
return nil
}

messageType, err := reader.IdentityMessageType(buf)
messageType, err := reader.IdentityMessageType(connection.Buffer())
if err != nil {
http1Log.Debugf("failed to identity message type, %v", err)
if buf.SkipCurrentElement() {
if connection.Buffer().SkipCurrentElement() {
break
}
continue
Expand All @@ -81,19 +88,19 @@ func (p *HTTP1Protocol) Analyze(metrics ProtocolMetrics, buf *buffer.Buffer, _ *
var result enums.ParseResult
switch messageType {
case reader.MessageTypeRequest:
result, _ = p.handleRequest(metrics, buf)
result, _ = p.handleRequest(metrics, connection.Buffer())
case reader.MessageTypeResponse:
result, _ = p.handleResponse(metrics, buf)
result, _ = p.handleResponse(metrics, connection.Buffer())
case reader.MessageTypeUnknown:
result = enums.ParseResultSkipPackage
}

finishReading := false
switch result {
case enums.ParseResultSuccess:
finishReading = buf.RemoveReadElements()
finishReading = connection.Buffer().RemoveReadElements()
case enums.ParseResultSkipPackage:
finishReading = buf.SkipCurrentElement()
finishReading = connection.Buffer().SkipCurrentElement()
}

if finishReading {
Expand All @@ -103,32 +110,35 @@ func (p *HTTP1Protocol) Analyze(metrics ProtocolMetrics, buf *buffer.Buffer, _ *
return nil
}

func (p *HTTP1Protocol) handleRequest(metrics ProtocolMetrics, buf *buffer.Buffer) (enums.ParseResult, error) {
func (p *HTTP1Protocol) ForProtocol() enums.ConnectionProtocol {
return enums.ConnectionProtocolHTTP
}

func (p *HTTP1Protocol) handleRequest(metrics *HTTP1Metrics, buf *buffer.Buffer) (enums.ParseResult, error) {
req, result, err := reader.ReadRequest(buf, true)
if err != nil {
return enums.ParseResultSkipPackage, err
}
if result != enums.ParseResultSuccess {
return result, nil
}
metrics.(*HTTP1Metrics).appendRequestToList(req)
metrics.appendRequestToList(req)
return result, nil
}

func (p *HTTP1Protocol) handleResponse(metrics ProtocolMetrics, b *buffer.Buffer) (enums.ParseResult, error) {
http1Metrics := metrics.(*HTTP1Metrics)
firstRequest := http1Metrics.halfRequests.Front()
func (p *HTTP1Protocol) handleResponse(metrics *HTTP1Metrics, b *buffer.Buffer) (enums.ParseResult, error) {
firstRequest := metrics.halfRequests.Front()
if firstRequest == nil {
return enums.ParseResultSkipPackage, nil
}
request := http1Metrics.halfRequests.Remove(firstRequest).(*reader.Request)
request := metrics.halfRequests.Remove(firstRequest).(*reader.Request)

// parsing response
response, result, err := reader.ReadResponse(request, b, true)
defer func() {
// if parsing response failed, then put the request back to the list
if result != enums.ParseResultSuccess {
http1Metrics.halfRequests.PushFront(request)
metrics.halfRequests.PushFront(request)
}
}()
if err != nil {
Expand All @@ -138,31 +148,31 @@ func (p *HTTP1Protocol) handleResponse(metrics ProtocolMetrics, b *buffer.Buffer
}

// getting the request and response, then send to the forwarder
p.handleHTTPData(http1Metrics, request, response)
p.analyze(metrics, request, response)
return enums.ParseResultSuccess, nil
}

func (p *HTTP1Protocol) handleHTTPData(metrics *HTTP1Metrics, request *reader.Request, response *reader.Response) {
func (p *HTTP1Protocol) HandleHTTPData(metrics *HTTP1Metrics, request *reader.Request, response *reader.Response) {
detailEvents := make([]events.SocketDetail, 0)
detailEvents = appendSocketDetailsFromBuffer(detailEvents, request.HeaderBuffer())
detailEvents = appendSocketDetailsFromBuffer(detailEvents, request.BodyBuffer())
detailEvents = appendSocketDetailsFromBuffer(detailEvents, response.HeaderBuffer())
detailEvents = appendSocketDetailsFromBuffer(detailEvents, response.BodyBuffer())
detailEvents = AppendSocketDetailsFromBuffer(detailEvents, request.HeaderBuffer())
detailEvents = AppendSocketDetailsFromBuffer(detailEvents, request.BodyBuffer())
detailEvents = AppendSocketDetailsFromBuffer(detailEvents, response.HeaderBuffer())
detailEvents = AppendSocketDetailsFromBuffer(detailEvents, response.BodyBuffer())

if len(detailEvents) == 0 {
http1Log.Warnf("cannot found any detail events for HTTP/1.x protocol, connection ID: %d, random ID: %d, data id: %d-%d",
metrics.connectionID, metrics.randomID,
metrics.ConnectionID, metrics.RandomID,
request.MinDataID(), response.BodyBuffer().LastSocketBuffer().DataID())
return
}
http1Log.Debugf("found fully HTTP1 request and response, contains %d detail events , connection ID: %d, random ID: %d",
len(detailEvents), metrics.connectionID, metrics.randomID)
len(detailEvents), metrics.ConnectionID, metrics.RandomID)
originalRequest := request.Original()
originalResponse := response.Original()

defer func() {
p.closeStream(originalRequest.Body)
p.closeStream(originalResponse.Body)
p.CloseStream(originalRequest.Body)
p.CloseStream(originalResponse.Body)
}()
forwarder.SendTransferProtocolEvent(p.ctx, detailEvents, &v3.AccessLogProtocolLogs{
Protocol: &v3.AccessLogProtocolLogs_Http{
Expand All @@ -171,12 +181,11 @@ func (p *HTTP1Protocol) handleHTTPData(metrics *HTTP1Metrics, request *reader.Re
EndTime: forwarder.BuildOffsetTimestamp(detailEvents[len(detailEvents)-1].GetEndTime()),
Version: v3.AccessLogHTTPProtocolVersion_HTTP1,
Request: &v3.AccessLogHTTPProtocolRequest{
Method: transformHTTPMethod(originalRequest.Method),
Method: TransformHTTPMethod(originalRequest.Method),
Path: originalRequest.URL.Path,
SizeOfHeadersBytes: uint64(request.HeaderBuffer().DataSize()),
SizeOfBodyBytes: uint64(request.BodyBuffer().DataSize()),

Trace: analyzeTraceInfo(func(key string) string {
Trace: AnalyzeTraceInfo(func(key string) string {
return originalRequest.Header.Get(key)
}, http1Log),
},
Expand All @@ -190,37 +199,12 @@ func (p *HTTP1Protocol) handleHTTPData(metrics *HTTP1Metrics, request *reader.Re
})
}

func (p *HTTP1Protocol) closeStream(ioReader io.Closer) {
func (p *HTTP1Protocol) CloseStream(ioReader io.Closer) {
if ioReader != nil {
_ = ioReader.Close()
}
}

func transformHTTPMethod(method string) v3.AccessLogHTTPProtocolRequestMethod {
switch method {
case "GET":
return v3.AccessLogHTTPProtocolRequestMethod_Get
case "POST":
return v3.AccessLogHTTPProtocolRequestMethod_Post
case "PUT":
return v3.AccessLogHTTPProtocolRequestMethod_Put
case "DELETE":
return v3.AccessLogHTTPProtocolRequestMethod_Delete
case "HEAD":
return v3.AccessLogHTTPProtocolRequestMethod_Head
case "OPTIONS":
return v3.AccessLogHTTPProtocolRequestMethod_Options
case "TRACE":
return v3.AccessLogHTTPProtocolRequestMethod_Trace
case "CONNECT":
return v3.AccessLogHTTPProtocolRequestMethod_Connect
case "PATCH":
return v3.AccessLogHTTPProtocolRequestMethod_Patch
}
http1Log.Warnf("unknown http method: %s", method)
return v3.AccessLogHTTPProtocolRequestMethod_Get
}

func (m *HTTP1Metrics) appendRequestToList(req *reader.Request) {
if m.halfRequests.Len() == 0 {
m.halfRequests.PushFront(req)
Expand Down
Loading
Loading