Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ability to support unknown IEs when decoding template sets #380

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 69 additions & 16 deletions pkg/collector/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,32 @@ import (
"github.com/vmware/go-ipfix/pkg/util"
)

// DecodingMode specifies how unknown information elements (in templates) are handled when decoding.
// Unknown information elements are elements which are not part of the static registry included with
// the library.
// Note that regardless of the DecodingMode, data sets must always match the corresponding template.
type DecodingMode string

const (
// DecodingModeStrict will cause decoding to fail when an unknown IE is encountered in a template.
DecodingModeStrict DecodingMode = "Strict"
// DecodingModeLenientKeepUnknown will accept unknown IEs in templates. When decoding the
// corresponding field in a data record, the value will be preserved (as an octet array).
DecodingModeLenientKeepUnknown DecodingMode = "LenientKeepUnknown"
// DecodingModeLenientDropUnknown will accept unknown IEs in templates. When decoding the
// corresponding field in a data record, the value will be dropped (information element will
// not be present in the resulting Record). Be careful when using this mode as the IEs
// included in the resulting Record will no longer match the received template.
DecodingModeLenientDropUnknown DecodingMode = "LenientDropUnknown"
)

type CollectingProcess struct {
// for each obsDomainID, there is a map of templates
templatesMap map[uint32]map[uint16][]*entities.InfoElement
// mutex allows multiple readers or one writer at the same time
mutex sync.RWMutex
// template lifetime
templateTTL uint32
templateTTL time.Duration
// server information
address string
// server protocol
Expand All @@ -57,6 +76,9 @@ type CollectingProcess struct {
// numExtraElements specifies number of elements that could be added after
// decoding the IPFIX data packet.
numExtraElements int
// decodingMode specifies how unknown information elements (in templates) are handled when
// decoding.
decodingMode DecodingMode
// caCert, serverCert and serverKey are for storing encryption info when using TLS/DTLS
caCert []byte
serverCert []byte
Expand All @@ -80,6 +102,10 @@ type CollectorInput struct {
ServerCert []byte
ServerKey []byte
NumExtraElements int
// DecodingMode specifies how unknown information elements (in templates) are handled when
// decoding. The default value is DecodingModeStrict for historical reasons. For most uses,
// DecodingModeLenientKeepUnknown is the most appropriate mode.
DecodingMode DecodingMode
}

type clientHandler struct {
Expand All @@ -88,10 +114,24 @@ type clientHandler struct {
}

func InitCollectingProcess(input CollectorInput) (*CollectingProcess, error) {
templateTTLSeconds := input.TemplateTTL
if input.Protocol == "udp" && templateTTLSeconds == 0 {
templateTTLSeconds = entities.TemplateTTL
}
templateTTL := time.Duration(templateTTLSeconds) * time.Second
decodingMode := input.DecodingMode
if decodingMode == "" {
decodingMode = DecodingModeStrict
}
klog.InfoS(
"Initializing the collecting process",
"encrypted", input.IsEncrypted, "address", input.Address, "protocol", input.Protocol, "maxBufferSize", input.MaxBufferSize,
"templateTTL", templateTTL, "numExtraElements", input.NumExtraElements, "decodingMode", decodingMode,
)
collectProc := &CollectingProcess{
templatesMap: make(map[uint32]map[uint16][]*entities.InfoElement),
mutex: sync.RWMutex{},
templateTTL: input.TemplateTTL,
templateTTL: templateTTL,
address: input.Address,
protocol: input.Protocol,
maxBufferSize: input.MaxBufferSize,
Expand All @@ -103,11 +143,13 @@ func InitCollectingProcess(input CollectorInput) (*CollectingProcess, error) {
serverCert: input.ServerCert,
serverKey: input.ServerKey,
numExtraElements: input.NumExtraElements,
decodingMode: decodingMode,
}
return collectProc, nil
}

func (cp *CollectingProcess) Start() {
klog.Info("Starting the collecting process")
if cp.protocol == "tcp" {
cp.startTCPServer()
} else if cp.protocol == "udp" {
Expand All @@ -119,7 +161,7 @@ func (cp *CollectingProcess) Stop() {
close(cp.stopChan)
// wait for all connections to be safely deleted and returned
cp.wg.Wait()
klog.Info("stopping the collecting process")
klog.Info("Stopping the collecting process")
}

func (cp *CollectingProcess) GetAddress() net.Addr {
Expand Down Expand Up @@ -228,7 +270,11 @@ func (cp *CollectingProcess) decodeTemplateSet(templateBuffer *bytes.Buffer, obs
enterpriseID = registry.IANAEnterpriseID
element, err = registry.GetInfoElementFromID(elementID, enterpriseID)
if err != nil {
return nil, err
if cp.decodingMode == DecodingModeStrict {
return nil, err
}
klog.InfoS("Template includes an information element that is not present in registry", "obsDomainID", obsDomainID, "templateID", templateID, "enterpriseID", enterpriseID, "elementID", elementID)
element = entities.NewInfoElement("", elementID, entities.OctetArray, enterpriseID, elementLength)
}
} else {
/*
Expand All @@ -254,7 +300,11 @@ func (cp *CollectingProcess) decodeTemplateSet(templateBuffer *bytes.Buffer, obs
elementID = binary.BigEndian.Uint16(elementid)
element, err = registry.GetInfoElementFromID(elementID, enterpriseID)
if err != nil {
return nil, err
if cp.decodingMode == DecodingModeStrict {
return nil, err
}
klog.InfoS("Template includes an information element that is not present in registry", "obsDomainID", obsDomainID, "templateID", templateID, "enterpriseID", enterpriseID, "elementID", elementID)
element = entities.NewInfoElement("", elementID, entities.OctetArray, enterpriseID, elementLength)
}
}
if elementsWithValue[i], err = entities.DecodeAndCreateInfoElementWithValue(element, nil); err != nil {
Expand All @@ -281,17 +331,24 @@ func (cp *CollectingProcess) decodeDataSet(dataBuffer *bytes.Buffer, obsDomainID
}

for dataBuffer.Len() > 0 {
elements := make([]entities.InfoElementWithValue, len(template), len(template)+cp.numExtraElements)
for i, element := range template {
elements := make([]entities.InfoElementWithValue, 0, len(template)+cp.numExtraElements)
for _, ie := range template {
var length int
if element.Len == entities.VariableLength { // string
if ie.Len == entities.VariableLength { // string / octet array
length = getFieldLength(dataBuffer)
} else {
length = int(element.Len)
length = int(ie.Len)
}
if elements[i], err = entities.DecodeAndCreateInfoElementWithValue(element, dataBuffer.Next(length)); err != nil {
element, err := entities.DecodeAndCreateInfoElementWithValue(ie, dataBuffer.Next(length))
if err != nil {
return nil, err
}
// A missing name means an unknown element was received
if cp.decodingMode == DecodingModeLenientDropUnknown && ie.Name == "" {
klog.V(5).InfoS("Dropping field for unknown information element", "obsDomainID", obsDomainID, "ie", ie)
continue
}
elements = append(elements, element)
}
err = dataSet.AddRecordV2(elements, templateID)
if err != nil {
Expand All @@ -313,16 +370,12 @@ func (cp *CollectingProcess) addTemplate(obsDomainID uint32, templateID uint16,
}
cp.templatesMap[obsDomainID][templateID] = elements
// template lifetime management
if cp.protocol == "tcp" {
if cp.protocol != "udp" {
return
}

// Handle udp template expiration
if cp.templateTTL == 0 {
cp.templateTTL = entities.TemplateTTL // Default value
}
go func() {
ticker := time.NewTicker(time.Duration(cp.templateTTL) * time.Second)
ticker := time.NewTicker(cp.templateTTL)
defer ticker.Stop()
select {
case <-ticker.C:
Expand Down
77 changes: 77 additions & 0 deletions pkg/collector/process_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import (
"context"
"crypto/tls"
"crypto/x509"
"encoding/binary"
"fmt"
"net"
"runtime"
"sync"
Expand All @@ -31,6 +33,7 @@ import (
"k8s.io/apimachinery/pkg/util/wait"

"github.com/vmware/go-ipfix/pkg/entities"
"github.com/vmware/go-ipfix/pkg/exporter"
"github.com/vmware/go-ipfix/pkg/registry"
testcerts "github.com/vmware/go-ipfix/pkg/test/certs"
)
Expand Down Expand Up @@ -89,6 +92,7 @@ func TestUDPCollectingProcess_ReceiveTemplateRecord(t *testing.T) {
t.Fatalf("UDP Collecting Process does not start correctly: %v", err)
}
go cp.Start()

// wait until collector is ready
waitForCollectorReady(t, cp)
collectorAddr := cp.GetAddress()
Expand Down Expand Up @@ -605,6 +609,79 @@ func TestUDPCollectingProcessIPv6(t *testing.T) {
assert.Equal(t, net.ParseIP("2001:0:3238:DFE1:63::FEFB"), ie.GetIPAddressValue())
}

// TestUnknownInformationElement validates that message decoding when dealing with unknown IEs (not
// part of the static registry included in this project). All 3 supported decoding modes are tested.
func TestUnknownInformationElement(t *testing.T) {
const (
templateID = 100
obsDomainID = 0xabcd
unknownID = 999
unknownValue = uint32(0x1234)
)

for _, enterpriseID := range []uint32{registry.IANAEnterpriseID, registry.AntreaEnterpriseID} {
for _, mode := range []DecodingMode{DecodingModeStrict, DecodingModeLenientKeepUnknown, DecodingModeLenientDropUnknown} {
t.Run(fmt.Sprintf("enterpriseID-%d_%s", enterpriseID, mode), func(t *testing.T) {
input := getCollectorInput(tcpTransport, false, false)
input.DecodingMode = mode
cp, err := InitCollectingProcess(input)
require.NoError(t, err)
defer cp.Stop()

go func() { // remove the message from the message channel
for range cp.GetMsgChan() {
}
}()

// First, send template set.

unknownIE := entities.NewInfoElement("foo", unknownID, entities.Unsigned32, enterpriseID, 4)
knownIE1, _ := registry.GetInfoElement("octetDeltaCount", registry.IANAEnterpriseID)
knownIE2, _ := registry.GetInfoElement("sourceNodeName", registry.AntreaEnterpriseID)
templateSet, err := entities.MakeTemplateSet(templateID, []*entities.InfoElement{knownIE1, unknownIE, knownIE2})
require.NoError(t, err)
templateBytes, err := exporter.CreateIPFIXMsg(templateSet, obsDomainID, 0 /* seqNumber */, time.Now())
require.NoError(t, err)
_, err = cp.decodePacket(bytes.NewBuffer(templateBytes), "1.2.3.4:12345")
// If decoding is strict, there will be an error and we need to stop the test.
if mode == DecodingModeStrict {
require.Error(t, err)
return
}
require.NoError(t, err)

// Second, send data set.

unknownIEWithValue := entities.NewUnsigned32InfoElement(unknownIE, unknownValue)
knownIE1WithValue := entities.NewUnsigned64InfoElement(knownIE1, 0x100)
knownIE2WithValue := entities.NewStringInfoElement(knownIE2, "node-1")
dataSet, err := entities.MakeDataSet(templateID, []entities.InfoElementWithValue{knownIE1WithValue, unknownIEWithValue, knownIE2WithValue})
require.NoError(t, err)
dataBytes, err := exporter.CreateIPFIXMsg(dataSet, obsDomainID, 1 /* seqNumber */, time.Now())
require.NoError(t, err)
msg, err := cp.decodePacket(bytes.NewBuffer(dataBytes), "1.2.3.4:12345")
require.NoError(t, err)
records := msg.GetSet().GetRecords()
require.Len(t, records, 1)
record := records[0]
ies := record.GetOrderedElementList()

if mode == DecodingModeLenientKeepUnknown {
require.Len(t, ies, 3)
// the unknown IE after decoding
ieWithValue := ies[1]
// the decoded IE has no name and the type always defaults to OctetArray
require.Equal(t, entities.NewInfoElement("", unknownID, entities.OctetArray, enterpriseID, 4), ieWithValue.GetInfoElement())
value := ieWithValue.GetOctetArrayValue()
assert.Equal(t, unknownValue, binary.BigEndian.Uint32(value))
} else if mode == DecodingModeLenientDropUnknown {
require.Len(t, ies, 2)
}
})
}
}
}

func getCollectorInput(network string, isEncrypted bool, isIPv6 bool) CollectorInput {
if network == tcpTransport {
var address string
Expand Down
31 changes: 31 additions & 0 deletions pkg/entities/ie.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,8 @@
return nil, fmt.Errorf("error when converting value to []bytes for decoding")
}
switch dataType {
case OctetArray:
return value, nil

Check warning on line 166 in pkg/entities/ie.go

View check run for this annotation

Codecov / codecov/patch

pkg/entities/ie.go#L165-L166

Added lines #L165 - L166 were not covered by tests
case Unsigned8:
return value[0], nil
case Unsigned16:
Expand Down Expand Up @@ -211,6 +213,12 @@
// returns appropriate InfoElementWithValue.
func DecodeAndCreateInfoElementWithValue(element *InfoElement, value []byte) (InfoElementWithValue, error) {
switch element.DataType {
case OctetArray:
var val []byte
if value != nil {
val = append(val, value...)
}
return NewOctetArrayInfoElement(element, val), nil

Check warning on line 221 in pkg/entities/ie.go

View check run for this annotation

Codecov / codecov/patch

pkg/entities/ie.go#L216-L221

Added lines #L216 - L221 were not covered by tests
case Unsigned8:
var val uint8
if value == nil {
Expand Down Expand Up @@ -354,6 +362,10 @@
// used for testing.
func EncodeToIEDataType(dataType IEDataType, val interface{}) ([]byte, error) {
switch dataType {
case OctetArray:
// Supporting the type properly would require knowing whether we are dealing with a
// fixed-length or variable-length element.
return nil, fmt.Errorf("octet array data type not supported by this method yet")

Check warning on line 368 in pkg/entities/ie.go

View check run for this annotation

Codecov / codecov/patch

pkg/entities/ie.go#L365-L368

Added lines #L365 - L368 were not covered by tests
case Unsigned8:
v, ok := val.(uint8)
if !ok {
Expand Down Expand Up @@ -521,6 +533,25 @@
return fmt.Errorf("buffer size is not enough for encoding")
}
switch element.GetDataType() {
case OctetArray:
v := element.GetOctetArrayValue()
ieLen := element.GetInfoElement().Len
if ieLen < VariableLength {
// fixed length case
if len(v) != int(ieLen) {
return fmt.Errorf("invalid value for fixed-length octet array: length mismatch")
}

Check warning on line 543 in pkg/entities/ie.go

View check run for this annotation

Codecov / codecov/patch

pkg/entities/ie.go#L542-L543

Added lines #L542 - L543 were not covered by tests
copy(buffer[index:], v)
} else if len(v) < 255 {
buffer[index] = uint8(len(v))
copy(buffer[index+1:], v)
} else if len(v) <= math.MaxUint16 {
buffer[index] = byte(255) // marker byte for long array
binary.BigEndian.PutUint16(buffer[index+1:index+3], uint16(len(v)))
copy(buffer[index+3:], v)
} else {
return fmt.Errorf("provided OctetArray value is too long and cannot be encoded: len=%d, maxlen=%d", len(v), math.MaxUint16)
}

Check warning on line 554 in pkg/entities/ie.go

View check run for this annotation

Codecov / codecov/patch

pkg/entities/ie.go#L553-L554

Added lines #L553 - L554 were not covered by tests
case Unsigned8:
copy(buffer[index:index+1], []byte{element.GetUnsigned8Value()})
case Unsigned16:
Expand Down
Loading
Loading