Skip to content

Commit

Permalink
NETOBSERV-1996: in-kernel de-duplication
Browse files Browse the repository at this point in the history
- Remove interface from the flow key; instead, use it as flow value. The
  first interface+dir seen for a given flow is the one taken into account
for counters. Other interfaces+dirs are stored in a separate map for this
flow. This algorithm is more or less the deduper algo that we had in
userspace.
- Remove user-space deduper
- Adapt user-space model for the new interfaces+directions array
  provided directly from ebpf structs
- Remove "decorator" (which was doing the interface naming). This is
  just for simplification. This enrichment is now done in a more
straightforward way, when creating the Record objects
  • Loading branch information
jotak committed Nov 29, 2024
1 parent bf6ad1e commit 8693417
Show file tree
Hide file tree
Showing 36 changed files with 344 additions and 1,150 deletions.
39 changes: 33 additions & 6 deletions bpf/flows.c
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,20 @@ static inline void update_existing_flow(flow_metrics *aggregate_flow, pkt_info *
bpf_spin_unlock(&aggregate_flow->lock);
}

static inline void add_observed_intf(additional_metrics *value, u32 if_index, u8 direction) {
if (value->nb_observed_intf < MAX_OBSERVED_INTERFACES) {
for (u8 i = 0; i < value->nb_observed_intf; i++) {
if (value->observed_intf[i].if_index == if_index &&
value->observed_intf[i].direction == direction) {
return;
}
}
value->observed_intf[value->nb_observed_intf].if_index = if_index;
value->observed_intf[value->nb_observed_intf].direction = direction;
value->nb_observed_intf++;
}
}

static inline int flow_monitor(struct __sk_buff *skb, u8 direction) {
// If sampling is defined, will only parse 1 out of "sampling" flows
if (sampling > 1 && (bpf_get_prandom_u32() % sampling) != 0) {
Expand All @@ -97,8 +111,7 @@ static inline int flow_monitor(struct __sk_buff *skb, u8 direction) {
}

//Set extra fields
id.if_index = skb->ifindex;
id.direction = direction;
pkt.direction = direction;

// check if this packet need to be filtered if filtering feature is enabled
bool skip = check_and_do_flow_filtering(&pkt, 0);
Expand All @@ -110,14 +123,28 @@ static inline int flow_monitor(struct __sk_buff *skb, u8 direction) {
if (enable_dns_tracking) {
dns_errno = track_dns_packet(skb, &pkt);
}
// TODO: we need to add spinlock here when we deprecate versions prior to 5.1, or provide
// a spinlocked alternative version and use it selectively https://lwn.net/Articles/779120/
flow_metrics *aggregate_flow = (flow_metrics *)bpf_map_lookup_elem(&aggregated_flows, &id);
if (aggregate_flow != NULL) {
update_existing_flow(aggregate_flow, &pkt, dns_errno, len);
if (aggregate_flow->if_index_first_seen == skb->ifindex) {
update_existing_flow(aggregate_flow, &pkt, len, dns_errno);
} else {
// Only add info that we've seen this interface
// TODO: should we merge DNS info? TCP flags?
additional_metrics *aggregate_flow =
(additional_metrics *)bpf_map_lookup_elem(&additional_flow_metrics, &id);
if (aggregate_flow != NULL) {
add_observed_intf(aggregate_flow, skb->ifindex, direction);
} else {
additional_metrics new_metrics = {};
add_observed_intf(&new_metrics, skb->ifindex, direction);
bpf_map_update_elem(&additional_flow_metrics, &id, &new_metrics, BPF_NOEXIST);
}
}
} else {
// Key does not exist in the map, and will need to create a new entry.
flow_metrics new_flow = {
.if_index_first_seen = skb->ifindex,
.direction_first_seen = direction,
.packets = 1,
.bytes = len,
.eth_protocol = pkt.eth_protocol,
Expand All @@ -142,7 +169,7 @@ static inline int flow_monitor(struct __sk_buff *skb, u8 direction) {
flow_metrics *aggregate_flow =
(flow_metrics *)bpf_map_lookup_elem(&aggregated_flows, &id);
if (aggregate_flow != NULL) {
update_existing_flow(aggregate_flow, &pkt, dns_errno, len);
update_existing_flow(aggregate_flow, &pkt, len, dns_errno);
} else {
if (trace_messages) {
bpf_printk("failed to update an exising flow\n");
Expand Down
4 changes: 2 additions & 2 deletions bpf/flows_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ static __always_inline int do_flow_filter_lookup(pkt_info *pkt, struct filter_ke

if (!is_zero_ip(rule->ip, len)) {
// for Ingress side we can filter using dstIP and for Egress side we can filter using srcIP
if (pkt->id->direction == INGRESS) {
if (pkt->direction == INGRESS) {
if (is_equal_ip(rule->ip, pkt->id->dst_ip + offset, len)) {
BPF_PRINTK("dstIP matched\n");
result++;
Expand All @@ -184,7 +184,7 @@ static __always_inline int do_flow_filter_lookup(pkt_info *pkt, struct filter_ke
}

if (rule->direction != MAX_DIRECTION) {
if (rule->direction == pkt->id->direction) {
if (rule->direction == pkt->direction) {
BPF_PRINTK("direction matched\n");
result++;
} else {
Expand Down
9 changes: 3 additions & 6 deletions bpf/network_events_monitoring.h
Original file line number Diff line number Diff line change
Expand Up @@ -101,12 +101,9 @@ static inline int trace_network_events(struct sk_buff *skb, struct rh_psample_me
return 0;
}

for (direction dir = INGRESS; dir < MAX_DIRECTION; dir++) {
id.direction = dir;
ret = lookup_and_update_existing_flow_network_events(&id, md_len, user_cookie);
if (ret == 0) {
return ret;
}
ret = lookup_and_update_existing_flow_network_events(&id, md_len, user_cookie);
if (ret == 0) {
return ret;
}

// there is no matching flows so lets create new one and add the network event metadata
Expand Down
5 changes: 2 additions & 3 deletions bpf/pca.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,8 @@ static inline bool validate_pca_filter(struct __sk_buff *skb, direction dir) {
return false;
}

//Set extra fields
id.if_index = skb->ifindex;
id.direction = dir;
// Set extra fields
pkt.direction = dir;

// check if this packet need to be filtered if filtering feature is enabled
bool skip = check_and_do_flow_filtering(&pkt, 0);
Expand Down
28 changes: 18 additions & 10 deletions bpf/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ typedef __u64 u64;
#define MAX_FILTER_ENTRIES 1 // we have only one global filter
#define MAX_EVENT_MD 8
#define MAX_NETWORK_EVENTS 4
#define MAX_OBSERVED_INTERFACES 4

// according to field 61 in https://www.iana.org/assignments/ipfix/ipfix.xhtml
typedef enum direction_t {
Expand All @@ -79,6 +80,9 @@ typedef struct flow_metrics_t {
// L2 data link layer
u8 src_mac[ETH_ALEN];
u8 dst_mac[ETH_ALEN];
// OS interface index
u32 if_index_first_seen;
u8 direction_first_seen;
u32 packets;
u64 bytes;
// Flow start and end times as monotomic timestamps in nanoseconds
Expand Down Expand Up @@ -112,11 +116,15 @@ typedef struct additional_metrics_t {
u64 flow_rtt;
u8 network_events_idx;
u8 network_events[MAX_NETWORK_EVENTS][MAX_EVENT_MD];
struct observed_intf_t {
u8 direction;
u32 if_index;
} __attribute__((packed)) observed_intf[MAX_OBSERVED_INTERFACES];
u8 nb_observed_intf;
} additional_metrics;

// Attributes that uniquely identify a flow
typedef struct flow_id_t {
u8 direction;
// L3 network layer
// IPv4 addresses are encoded as IPv6 addresses with prefix ::ffff/96
// as described in https://datatracker.ietf.org/doc/html/rfc4038#section-4.2
Expand All @@ -129,8 +137,6 @@ typedef struct flow_id_t {
// ICMP protocol
u8 icmp_type;
u8 icmp_code;
// OS interface index
u32 if_index;
} flow_id;

// Flow record is a tuple containing both flow identifier and metrics. It is used to send
Expand All @@ -146,6 +152,7 @@ typedef struct pkt_info_t {
flow_id *id;
u64 current_ts; // ts recorded when pkt came.
u16 eth_protocol;
u8 direction;
u8 src_mac[ETH_ALEN];
u8 dst_mac[ETH_ALEN];
u16 flags; // TCP specific
Expand Down Expand Up @@ -230,12 +237,13 @@ const struct flow_metrics_t *unused2 __attribute__((unused));
const struct additional_metrics_t *unused3 __attribute__((unused));
const struct dns_record_t *unused4 __attribute__((unused));
const struct pkt_drops_t *unused5 __attribute__((unused));
const struct filter_value_t *unused6 __attribute__((unused));
const struct flow_id_t *unused7 __attribute__((unused));
const struct flow_record_t *unused8 __attribute__((unused));
const struct pkt_id_t *unused9 __attribute__((unused));
const struct filter_key_t *unused10 __attribute__((unused));
const enum filter_action_t *unused11 __attribute__((unused));
const enum global_counters_key_t *unused12 __attribute__((unused));
const struct observed_intf_t *unused6 __attribute__((unused));
const struct filter_value_t *unused7 __attribute__((unused));
const struct flow_id_t *unused8 __attribute__((unused));
const struct flow_record_t *unused9 __attribute__((unused));
const struct pkt_id_t *unused10 __attribute__((unused));
const struct filter_key_t *unused11 __attribute__((unused));
const enum filter_action_t *unused12 __attribute__((unused));
const enum global_counters_key_t *unused13 __attribute__((unused));

#endif /* __TYPES_H__ */
3 changes: 0 additions & 3 deletions cmd/netobserv-ebpf-agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,6 @@ func main() {
Error("PProf HTTP listener stopped working")
}()
}
if config.DeduperFCExpiry == 0 {
config.DeduperFCExpiry = 2 * config.CacheActiveTimeout
}

logrus.WithField("configuration", fmt.Sprintf("%#v", config)).Debugf("configuration loaded")

Expand Down
53 changes: 16 additions & 37 deletions pkg/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,13 +118,8 @@ type Flows struct {
rbTracer *flow.RingBufTracer
accounter *flow.Accounter
limiter *flow.CapacityLimiter
deduper node.MiddleFunc[[]*model.Record, []*model.Record]
exporter node.TerminalFunc[[]*model.Record]

// elements used to decorate flows with extra information
interfaceNamer flow.InterfaceNamer
agentIP net.IP

status Status
promoServer *http.Server
sampleDecoder *ovnobserv.SampleDecoder
Expand Down Expand Up @@ -275,6 +270,8 @@ func flowsAgent(cfg *Config, m *metrics.Metrics,
}
return iface
}
model.SetGlobals(agentIP, interfaceNamer)

var promoServer *http.Server
if cfg.MetricsEnable {
promoServer = promo.InitializePrometheus(m.Settings)
Expand All @@ -287,26 +284,19 @@ func flowsAgent(cfg *Config, m *metrics.Metrics,
rbTracer := flow.NewRingBufTracer(fetcher, mapTracer, cfg.CacheActiveTimeout, m)
accounter := flow.NewAccounter(cfg.CacheMaxFlows, cfg.CacheActiveTimeout, time.Now, monotime.Now, m)
limiter := flow.NewCapacityLimiter(m)
var deduper node.MiddleFunc[[]*model.Record, []*model.Record]
if cfg.Deduper == DeduperFirstCome {
deduper = flow.Dedupe(cfg.DeduperFCExpiry, cfg.DeduperJustMark, cfg.DeduperMerge, interfaceNamer, m)
}

return &Flows{
ebpf: fetcher,
exporter: exporter,
interfaces: registerer,
filter: filter,
cfg: cfg,
mapTracer: mapTracer,
rbTracer: rbTracer,
accounter: accounter,
limiter: limiter,
deduper: deduper,
agentIP: agentIP,
interfaceNamer: interfaceNamer,
promoServer: promoServer,
sampleDecoder: s,
ebpf: fetcher,
exporter: exporter,
interfaces: registerer,
filter: filter,
cfg: cfg,
mapTracer: mapTracer,
rbTracer: rbTracer,
accounter: accounter,
limiter: limiter,
promoServer: promoServer,
sampleDecoder: s,
}, nil
}

Expand Down Expand Up @@ -501,9 +491,6 @@ func (f *Flows) buildAndStartPipeline(ctx context.Context) (*node.Terminal[[]*mo
limiter := node.AsMiddle(f.limiter.Limit,
node.ChannelBufferLen(f.cfg.BuffersLength))

decorator := node.AsMiddle(flow.Decorate(f.agentIP, f.interfaceNamer),
node.ChannelBufferLen(f.cfg.BuffersLength))

ebl := f.cfg.ExporterBufferLength
if ebl == 0 {
ebl = f.cfg.BuffersLength
Expand All @@ -514,17 +501,9 @@ func (f *Flows) buildAndStartPipeline(ctx context.Context) (*node.Terminal[[]*mo

rbTracer.SendsTo(accounter)

if f.deduper != nil {
deduper := node.AsMiddle(f.deduper, node.ChannelBufferLen(f.cfg.BuffersLength))
mapTracer.SendsTo(deduper)
accounter.SendsTo(deduper)
deduper.SendsTo(limiter)
} else {
mapTracer.SendsTo(limiter)
accounter.SendsTo(limiter)
}
limiter.SendsTo(decorator)
decorator.SendsTo(export)
mapTracer.SendsTo(limiter)
accounter.SendsTo(limiter)
limiter.SendsTo(export)

alog.Debug("starting graph")
mapTracer.Start()
Expand Down
Loading

0 comments on commit 8693417

Please sign in to comment.