Skip to content

Commit

Permalink
Ability to support unknown IEs when decoding template sets
Browse files Browse the repository at this point in the history
Introduce a "decoding mode" configuration for the collector process,
which can take one of 3 values: Strict, LenientKeepUnknown,
LenientDropUnknown. When the mode is Strict, unknown IEs are
rejected. This is the default mode and matches the behavior prior to
this patch. When the mode is LenientKeepUnknown, unknown IEs are
accepted and will be preserved in data records. When the mode is
LenientDropUnknown, unknown IEs are accepted but will not be included in
data records (creating a mismatch between the data record and the
template).

When decoding is not strict and the IE is not found in the registry, the
name and type of the IE are not known. Name is kept as an empty
string. Type is always set to OctetArray. The length is known as it is
included in the template record.

This change required adding support for the OctetArray type
(fixed-length or variable-length).

Signed-off-by: Antonin Bas <antonin.bas@broadcom.com>
  • Loading branch information
antoninbas committed Nov 5, 2024
1 parent 96bf286 commit 6044ae4
Show file tree
Hide file tree
Showing 8 changed files with 310 additions and 44 deletions.
85 changes: 69 additions & 16 deletions pkg/collector/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,32 @@ import (
"github.com/vmware/go-ipfix/pkg/util"
)

// DecodingMode specifies how unknown information elements (in templates) are handled when decoding.
// Unknown information elements are elements which are not part of the static registry included with
// the library.
// Note that regardless of the DecodingMode, data sets must always match the corresponding template.
type DecodingMode string

const (
// DecodingModeStrict will cause decoding to fail when an unknown IE is encountered in a template.
DecodingModeStrict DecodingMode = "Strict"
// DecodingModeLenientKeepUnknown will accept unknown IEs in templates. When decoding the
// corresponding field in a data record, the value will be preserved (as an octet array).
DecodingModeLenientKeepUnknown DecodingMode = "LenientKeepUnknown"
// DecodingModeLenientDropUnknown will accept unknown IEs in templates. When decoding the
// corresponding field in a data record, the value will be dropped (information element will
// not be present in the resulting Record). Be careful when using this mode as the IEs
// included in the resulting Record will no longer match the received template.
DecodingModeLenientDropUnknown DecodingMode = "LenientDropUnknown"
)

type CollectingProcess struct {
// for each obsDomainID, there is a map of templates
templatesMap map[uint32]map[uint16][]*entities.InfoElement
// mutex allows multiple readers or one writer at the same time
mutex sync.RWMutex
// template lifetime
templateTTL uint32
templateTTL time.Duration
// server information
address string
// server protocol
Expand All @@ -57,6 +76,9 @@ type CollectingProcess struct {
// numExtraElements specifies number of elements that could be added after
// decoding the IPFIX data packet.
numExtraElements int
// decodingMode specifies how unknown information elements (in templates) are handled when
// decoding.
decodingMode DecodingMode
// caCert, serverCert and serverKey are for storing encryption info when using TLS/DTLS
caCert []byte
serverCert []byte
Expand All @@ -80,6 +102,10 @@ type CollectorInput struct {
ServerCert []byte
ServerKey []byte
NumExtraElements int
// DecodingMode specifies how unknown information elements (in templates) are handled when
// decoding. The default value is DecodingModeStrict for historical reasons. For most uses,
// DecodingModeLenientKeepUnknown is the most appropriate mode.
DecodingMode DecodingMode
}

type clientHandler struct {
Expand All @@ -88,10 +114,24 @@ type clientHandler struct {
}

func InitCollectingProcess(input CollectorInput) (*CollectingProcess, error) {
templateTTLSeconds := input.TemplateTTL
if input.Protocol == "udp" && templateTTLSeconds == 0 {
templateTTLSeconds = entities.TemplateTTL
}
templateTTL := time.Duration(templateTTLSeconds) * time.Second
decodingMode := input.DecodingMode
if decodingMode == "" {
decodingMode = DecodingModeStrict
}
klog.InfoS(
"Initializing the collecting process",
"encrypted", input.IsEncrypted, "address", input.Address, "protocol", input.Protocol, "maxBufferSize", input.MaxBufferSize,
"templateTTL", templateTTL, "numExtraElements", input.NumExtraElements, "decodingMode", decodingMode,
)
collectProc := &CollectingProcess{
templatesMap: make(map[uint32]map[uint16][]*entities.InfoElement),
mutex: sync.RWMutex{},
templateTTL: input.TemplateTTL,
templateTTL: templateTTL,
address: input.Address,
protocol: input.Protocol,
maxBufferSize: input.MaxBufferSize,
Expand All @@ -103,11 +143,13 @@ func InitCollectingProcess(input CollectorInput) (*CollectingProcess, error) {
serverCert: input.ServerCert,
serverKey: input.ServerKey,
numExtraElements: input.NumExtraElements,
decodingMode: decodingMode,
}
return collectProc, nil
}

func (cp *CollectingProcess) Start() {
klog.Info("Starting the collecting process")
if cp.protocol == "tcp" {
cp.startTCPServer()
} else if cp.protocol == "udp" {
Expand All @@ -119,7 +161,7 @@ func (cp *CollectingProcess) Stop() {
close(cp.stopChan)
// wait for all connections to be safely deleted and returned
cp.wg.Wait()
klog.Info("stopping the collecting process")
klog.Info("Stopping the collecting process")
}

func (cp *CollectingProcess) GetAddress() net.Addr {
Expand Down Expand Up @@ -228,7 +270,11 @@ func (cp *CollectingProcess) decodeTemplateSet(templateBuffer *bytes.Buffer, obs
enterpriseID = registry.IANAEnterpriseID
element, err = registry.GetInfoElementFromID(elementID, enterpriseID)
if err != nil {
return nil, err
if cp.decodingMode == DecodingModeStrict {
return nil, err
}
klog.InfoS("Template includes an information element that is not present in registry", "obsDomainID", obsDomainID, "templateID", templateID, "enterpriseID", enterpriseID, "elementID", elementID)
element = entities.NewInfoElement("", elementID, entities.OctetArray, enterpriseID, elementLength)
}
} else {
/*
Expand All @@ -254,7 +300,11 @@ func (cp *CollectingProcess) decodeTemplateSet(templateBuffer *bytes.Buffer, obs
elementID = binary.BigEndian.Uint16(elementid)
element, err = registry.GetInfoElementFromID(elementID, enterpriseID)
if err != nil {
return nil, err
if cp.decodingMode == DecodingModeStrict {
return nil, err
}
klog.InfoS("Template includes an information element that is not present in registry", "obsDomainID", obsDomainID, "templateID", templateID, "enterpriseID", enterpriseID, "elementID", elementID)
element = entities.NewInfoElement("", elementID, entities.OctetArray, enterpriseID, elementLength)
}
}
if elementsWithValue[i], err = entities.DecodeAndCreateInfoElementWithValue(element, nil); err != nil {
Expand All @@ -281,17 +331,24 @@ func (cp *CollectingProcess) decodeDataSet(dataBuffer *bytes.Buffer, obsDomainID
}

for dataBuffer.Len() > 0 {
elements := make([]entities.InfoElementWithValue, len(template), len(template)+cp.numExtraElements)
for i, element := range template {
elements := make([]entities.InfoElementWithValue, 0, len(template)+cp.numExtraElements)
for _, ie := range template {
var length int
if element.Len == entities.VariableLength { // string
if ie.Len == entities.VariableLength { // string / octet array
length = getFieldLength(dataBuffer)
} else {
length = int(element.Len)
length = int(ie.Len)
}
if elements[i], err = entities.DecodeAndCreateInfoElementWithValue(element, dataBuffer.Next(length)); err != nil {
element, err := entities.DecodeAndCreateInfoElementWithValue(ie, dataBuffer.Next(length))
if err != nil {
return nil, err
}
// A missing name means an unknown element was received
if cp.decodingMode == DecodingModeLenientDropUnknown && ie.Name == "" {
klog.V(5).InfoS("Dropping field for unknown information element", "obsDomainID", obsDomainID, "ie", ie)
continue
}
elements = append(elements, element)
}
err = dataSet.AddRecordV2(elements, templateID)
if err != nil {
Expand All @@ -313,16 +370,12 @@ func (cp *CollectingProcess) addTemplate(obsDomainID uint32, templateID uint16,
}
cp.templatesMap[obsDomainID][templateID] = elements
// template lifetime management
if cp.protocol == "tcp" {
if cp.protocol != "udp" {
return
}

// Handle udp template expiration
if cp.templateTTL == 0 {
cp.templateTTL = entities.TemplateTTL // Default value
}
go func() {
ticker := time.NewTicker(time.Duration(cp.templateTTL) * time.Second)
ticker := time.NewTicker(cp.templateTTL)
defer ticker.Stop()
select {
case <-ticker.C:
Expand Down
77 changes: 77 additions & 0 deletions pkg/collector/process_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import (
"context"
"crypto/tls"
"crypto/x509"
"encoding/binary"
"fmt"
"net"
"runtime"
"sync"
Expand All @@ -31,6 +33,7 @@ import (
"k8s.io/apimachinery/pkg/util/wait"

"github.com/vmware/go-ipfix/pkg/entities"
"github.com/vmware/go-ipfix/pkg/exporter"
"github.com/vmware/go-ipfix/pkg/registry"
testcerts "github.com/vmware/go-ipfix/pkg/test/certs"
)
Expand Down Expand Up @@ -89,6 +92,7 @@ func TestUDPCollectingProcess_ReceiveTemplateRecord(t *testing.T) {
t.Fatalf("UDP Collecting Process does not start correctly: %v", err)
}
go cp.Start()

// wait until collector is ready
waitForCollectorReady(t, cp)
collectorAddr := cp.GetAddress()
Expand Down Expand Up @@ -605,6 +609,79 @@ func TestUDPCollectingProcessIPv6(t *testing.T) {
assert.Equal(t, net.ParseIP("2001:0:3238:DFE1:63::FEFB"), ie.GetIPAddressValue())
}

// TestUnknownInformationElement validates that message decoding when dealing with unknown IEs (not
// part of the static registry included in this project). All 3 supported decoding modes are tested.
func TestUnknownInformationElement(t *testing.T) {
const (
templateID = 100
obsDomainID = 0xabcd
unknownID = 999
unknownValue = uint32(0x1234)
)

for _, enterpriseID := range []uint32{registry.IANAEnterpriseID, registry.AntreaEnterpriseID} {
for _, mode := range []DecodingMode{DecodingModeStrict, DecodingModeLenientKeepUnknown, DecodingModeLenientDropUnknown} {
t.Run(fmt.Sprintf("enterpriseID-%d_%s", enterpriseID, mode), func(t *testing.T) {
input := getCollectorInput(tcpTransport, false, false)
input.DecodingMode = mode
cp, err := InitCollectingProcess(input)
require.NoError(t, err)
defer cp.Stop()

go func() { // remove the message from the message channel
for range cp.GetMsgChan() {
}
}()

// First, send template set.

unknownIE := entities.NewInfoElement("foo", unknownID, entities.Unsigned32, enterpriseID, 4)
knownIE1, _ := registry.GetInfoElement("octetDeltaCount", registry.IANAEnterpriseID)
knownIE2, _ := registry.GetInfoElement("sourceNodeName", registry.AntreaEnterpriseID)
templateSet, err := entities.MakeTemplateSet(templateID, []*entities.InfoElement{knownIE1, unknownIE, knownIE2})
require.NoError(t, err)
templateBytes, err := exporter.CreateIPFIXMsg(templateSet, obsDomainID, 0 /* seqNumber */, time.Now())
require.NoError(t, err)
_, err = cp.decodePacket(bytes.NewBuffer(templateBytes), "1.2.3.4:12345")
// If decoding is strict, there will be an error and we need to stop the test.
if mode == DecodingModeStrict {
require.Error(t, err)
return
}
require.NoError(t, err)

// Second, send data set.

unknownIEWithValue := entities.NewUnsigned32InfoElement(unknownIE, unknownValue)
knownIE1WithValue := entities.NewUnsigned64InfoElement(knownIE1, 0x100)
knownIE2WithValue := entities.NewStringInfoElement(knownIE2, "node-1")
dataSet, err := entities.MakeDataSet(templateID, []entities.InfoElementWithValue{knownIE1WithValue, unknownIEWithValue, knownIE2WithValue})
require.NoError(t, err)
dataBytes, err := exporter.CreateIPFIXMsg(dataSet, obsDomainID, 1 /* seqNumber */, time.Now())
require.NoError(t, err)
msg, err := cp.decodePacket(bytes.NewBuffer(dataBytes), "1.2.3.4:12345")
require.NoError(t, err)
records := msg.GetSet().GetRecords()
require.Len(t, records, 1)
record := records[0]
ies := record.GetOrderedElementList()

if mode == DecodingModeLenientKeepUnknown {
require.Len(t, ies, 3)
// the unknown IE after decoding
ieWithValue := ies[1]
// the decoded IE has no name and the type always defaults to OctetArray
require.Equal(t, entities.NewInfoElement("", unknownID, entities.OctetArray, enterpriseID, 4), ieWithValue.GetInfoElement())
value := ieWithValue.GetOctetArrayValue()
assert.Equal(t, unknownValue, binary.BigEndian.Uint32(value))
} else if mode == DecodingModeLenientDropUnknown {
require.Len(t, ies, 2)
}
})
}
}
}

func getCollectorInput(network string, isEncrypted bool, isIPv6 bool) CollectorInput {
if network == tcpTransport {
var address string
Expand Down
31 changes: 31 additions & 0 deletions pkg/entities/ie.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,8 @@ func decodeToIEDataType(dataType IEDataType, val interface{}) (interface{}, erro
return nil, fmt.Errorf("error when converting value to []bytes for decoding")
}
switch dataType {
case OctetArray:
return value, nil

Check warning on line 166 in pkg/entities/ie.go

View check run for this annotation

Codecov / codecov/patch

pkg/entities/ie.go#L165-L166

Added lines #L165 - L166 were not covered by tests
case Unsigned8:
return value[0], nil
case Unsigned16:
Expand Down Expand Up @@ -211,6 +213,12 @@ func decodeToIEDataType(dataType IEDataType, val interface{}) (interface{}, erro
// returns appropriate InfoElementWithValue.
func DecodeAndCreateInfoElementWithValue(element *InfoElement, value []byte) (InfoElementWithValue, error) {
switch element.DataType {
case OctetArray:
var val []byte
if value != nil {
val = append(val, value...)
}
return NewOctetArrayInfoElement(element, val), nil

Check warning on line 221 in pkg/entities/ie.go

View check run for this annotation

Codecov / codecov/patch

pkg/entities/ie.go#L216-L221

Added lines #L216 - L221 were not covered by tests
case Unsigned8:
var val uint8
if value == nil {
Expand Down Expand Up @@ -354,6 +362,10 @@ func DecodeAndCreateInfoElementWithValue(element *InfoElement, value []byte) (In
// used for testing.
func EncodeToIEDataType(dataType IEDataType, val interface{}) ([]byte, error) {
switch dataType {
case OctetArray:
// Supporting the type properly would require knowing whether we are dealing with a
// fixed-length or variable-length element.
return nil, fmt.Errorf("octet array data type not supported by this method yet")

Check warning on line 368 in pkg/entities/ie.go

View check run for this annotation

Codecov / codecov/patch

pkg/entities/ie.go#L365-L368

Added lines #L365 - L368 were not covered by tests
case Unsigned8:
v, ok := val.(uint8)
if !ok {
Expand Down Expand Up @@ -521,6 +533,25 @@ func encodeInfoElementValueToBuff(element InfoElementWithValue, buffer []byte, i
return fmt.Errorf("buffer size is not enough for encoding")
}
switch element.GetDataType() {
case OctetArray:
v := element.GetOctetArrayValue()
ieLen := element.GetInfoElement().Len
if ieLen < VariableLength {
// fixed length case
if len(v) != int(ieLen) {
return fmt.Errorf("invalid value for fixed-length octet array: length mismatch")
}

Check warning on line 543 in pkg/entities/ie.go

View check run for this annotation

Codecov / codecov/patch

pkg/entities/ie.go#L542-L543

Added lines #L542 - L543 were not covered by tests
copy(buffer[index:], v)
} else if len(v) < 255 {
buffer[index] = uint8(len(v))
copy(buffer[index+1:], v)
} else if len(v) <= math.MaxUint16 {
buffer[index] = byte(255) // marker byte for long array
binary.BigEndian.PutUint16(buffer[index+1:index+3], uint16(len(v)))
copy(buffer[index+3:], v)
} else {
return fmt.Errorf("provided OctetArray value is too long and cannot be encoded: len=%d, maxlen=%d", len(v), math.MaxUint16)
}

Check warning on line 554 in pkg/entities/ie.go

View check run for this annotation

Codecov / codecov/patch

pkg/entities/ie.go#L553-L554

Added lines #L553 - L554 were not covered by tests
case Unsigned8:
copy(buffer[index:index+1], []byte{element.GetUnsigned8Value()})
case Unsigned16:
Expand Down
Loading

0 comments on commit 6044ae4

Please sign in to comment.