Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NETOBSERV-1994: remove unneeded bpf map update calls #466

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions bpf/dns_tracker.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@
#define DNS_DEFAULT_PORT 53
#define DNS_QR_FLAG 0x8000
#define UDP_MAXMSG 512
#define EINVAL 22
#define ENOENT 2

struct dns_header {
u16 id;
Expand Down Expand Up @@ -91,8 +89,11 @@ static __always_inline int track_dns_packet(struct __sk_buff *skb, pkt_info *pkt

if ((flags & DNS_QR_FLAG) == 0) { /* dns query */
fill_dns_id(pkt->id, &dns_req, dns_id, false);
if (bpf_map_lookup_elem(&dns_flows, &dns_req) == NULL) {
bpf_map_update_elem(&dns_flows, &dns_req, &ts, BPF_ANY);
ret = bpf_map_update_elem(&dns_flows, &dns_req, &ts, BPF_NOEXIST);
if (ret != 0) {
if (trace_messages && ret != -EEXIST) {
bpf_printk("error creating new dns entry %d\n", ret);
}
}
} else { /* dns response */
fill_dns_id(pkt->id, &dns_req, dns_id, true);
Expand Down
100 changes: 52 additions & 48 deletions bpf/flows.c
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,24 @@
*/
#include "network_events_monitoring.h"

static inline void update_existing_flow(flow_metrics *aggregate_flow, pkt_info *pkt, int dns_errno,
u64 len) {
aggregate_flow->packets += 1;
aggregate_flow->bytes += len;
aggregate_flow->end_mono_time_ts = pkt->current_ts;
// it might happen that start_mono_time hasn't been set due to
// the way percpu hashmap deal with concurrent map entries
if (aggregate_flow->start_mono_time_ts == 0) {
aggregate_flow->start_mono_time_ts = pkt->current_ts;
}
aggregate_flow->flags |= pkt->flags;
aggregate_flow->dscp = pkt->dscp;
aggregate_flow->dns_record.id = pkt->dns_id;
aggregate_flow->dns_record.flags = pkt->dns_flags;
aggregate_flow->dns_record.latency = pkt->dns_latency;
aggregate_flow->dns_record.errno = dns_errno;
}

static inline int flow_monitor(struct __sk_buff *skb, u8 direction) {
// If sampling is defined, will only parse 1 out of "sampling" flows
if (sampling > 1 && (bpf_get_prandom_u32() % sampling) != 0) {
Expand All @@ -70,6 +88,7 @@ static inline int flow_monitor(struct __sk_buff *skb, u8 direction) {
void *data_end = (void *)(long)skb->data_end;
void *data = (void *)(long)skb->data;
struct ethhdr *eth = (struct ethhdr *)data;
u64 len = skb->len;

if (fill_ethhdr(eth, data_end, &pkt) == DISCARD) {
return TC_ACT_OK;
Expand All @@ -93,33 +112,7 @@ static inline int flow_monitor(struct __sk_buff *skb, u8 direction) {
// a spinlocked alternative version and use it selectively https://lwn.net/Articles/779120/
flow_metrics *aggregate_flow = (flow_metrics *)bpf_map_lookup_elem(&aggregated_flows, &id);
if (aggregate_flow != NULL) {
aggregate_flow->packets += 1;
aggregate_flow->bytes += skb->len;
aggregate_flow->end_mono_time_ts = pkt.current_ts;
// it might happen that start_mono_time hasn't been set due to
// the way percpu hashmap deal with concurrent map entries
if (aggregate_flow->start_mono_time_ts == 0) {
aggregate_flow->start_mono_time_ts = pkt.current_ts;
}
aggregate_flow->flags |= pkt.flags;
aggregate_flow->dscp = pkt.dscp;
aggregate_flow->dns_record.id = pkt.dns_id;
aggregate_flow->dns_record.flags = pkt.dns_flags;
aggregate_flow->dns_record.latency = pkt.dns_latency;
aggregate_flow->dns_record.errno = dns_errno;
long ret = bpf_map_update_elem(&aggregated_flows, &id, aggregate_flow, BPF_ANY);
if (ret != 0) {
// usually error -16 (-EBUSY) is printed here.
// In this case, the flow is dropped, as submitting it to the ringbuffer would cause
// a duplicated UNION of flows (two different flows with partial aggregation of the same packets),
// which can't be deduplicated.
// other possible values https://chromium.googlesource.com/chromiumos/docs/+/master/constants/errnos.md
if (trace_messages) {
bpf_printk("error updating flow %d\n", ret);
}
// Update global counter for hashmap update errors
increase_counter(HASHMAP_FLOWS_DROPPED);
}
update_existing_flow(aggregate_flow, &pkt, dns_errno, len);
jotak marked this conversation as resolved.
Show resolved Hide resolved
} else {
// Key does not exist in the map, and will need to create a new entry.
u64 rtt = 0;
Expand All @@ -128,7 +121,7 @@ static inline int flow_monitor(struct __sk_buff *skb, u8 direction) {
}
flow_metrics new_flow = {
.packets = 1,
.bytes = skb->len,
.bytes = len,
.start_mono_time_ts = pkt.current_ts,
.end_mono_time_ts = pkt.current_ts,
.flags = pkt.flags,
Expand All @@ -140,31 +133,42 @@ static inline int flow_monitor(struct __sk_buff *skb, u8 direction) {
.flow_rtt = rtt,
};

// even if we know that the entry is new, another CPU might be concurrently inserting a flow
// so we need to specify BPF_ANY
long ret = bpf_map_update_elem(&aggregated_flows, &id, &new_flow, BPF_ANY);
long ret = bpf_map_update_elem(&aggregated_flows, &id, &new_flow, BPF_NOEXIST);
if (ret != 0) {
// usually error -16 (-EBUSY) or -7 (E2BIG) is printed here.
// In this case, we send the single-packet flow via ringbuffer as in the worst case we can have
// a repeated INTERSECTION of flows (different flows aggregating different packets),
// which can be re-aggregated at userpace.
// other possible values https://chromium.googlesource.com/chromiumos/docs/+/master/constants/errnos.md
if (trace_messages) {
if (trace_messages && ret != -EEXIST) {
bpf_printk("error adding flow %d\n", ret);
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would not personally consider EEXIST an error that needs logging (here, or in other places you have trace logging). Only if the subsequent lookup then fails to return an entry (see comment below).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the thing is map update can fail for many reasons ebusy or e2big so i need to capture those as well , now if eexist error will be happening alot I can filter out the trace for exist error and with counter back we will know when we drop flows

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, so that's basically what I meant: Use the trace message only for errors other than EEXIST (just move the log statement a bit further down where you're handling those anyway)


new_flow.errno = -ret;
flow_record *record =
(flow_record *)bpf_ringbuf_reserve(&direct_flows, sizeof(flow_record), 0);
if (!record) {
if (trace_messages) {
bpf_printk("couldn't reserve space in the ringbuf. Dropping flow");
if (ret == -EEXIST) {
flow_metrics *aggregate_flow =
(flow_metrics *)bpf_map_lookup_elem(&aggregated_flows, &id);
if (aggregate_flow != NULL) {
update_existing_flow(aggregate_flow, &pkt, dns_errno, len);
jotak marked this conversation as resolved.
Show resolved Hide resolved
} else {
if (trace_messages) {
bpf_printk("failed to update an exising flow\n");
}
// Update global counter for hashmap update errors
increase_counter(HASHMAP_FLOWS_DROPPED);
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could keep the HASHMAP_FLOWS_DROPPED counter and update it in an "else" branch here. I would expect this to basically never happen, but just to be on the safe side, and since you already have that counter.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah that is why I felt its less useful but I can reuse it when eexist fail to lkup

} else {
// usually error -16 (-EBUSY) or -7 (E2BIG) is printed here.
// In this case, we send the single-packet flow via ringbuffer as in the worst case we can have
// a repeated INTERSECTION of flows (different flows aggregating different packets),
// which can be re-aggregated at userpace.
// other possible values https://chromium.googlesource.com/chromiumos/docs/+/master/constants/errnos.md
new_flow.errno = -ret;
flow_record *record =
(flow_record *)bpf_ringbuf_reserve(&direct_flows, sizeof(flow_record), 0);
if (!record) {
if (trace_messages) {
bpf_printk("couldn't reserve space in the ringbuf. Dropping flow");
}
return TC_ACT_OK;
}
return TC_ACT_OK;
record->id = id;
record->metrics = new_flow;
bpf_ringbuf_submit(record, 0);
}
record->id = id;
record->metrics = new_flow;
bpf_ringbuf_submit(record, 0);
}
}
return TC_ACT_OK;
Expand Down
10 changes: 3 additions & 7 deletions bpf/flows_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,9 @@

#include "utils.h"

// remove the comment below to enable debug prints
//#define ENABLE_BPF_PRINTK
#ifdef ENABLE_BPF_PRINTK
#define BPF_PRINTK(fmt, args...) bpf_printk(fmt, ##args)
#else
#define BPF_PRINTK(fmt, args...)
#endif
#define BPF_PRINTK(fmt, args...) \
if (trace_messages) \
bpf_printk(fmt, ##args)

static __always_inline int is_zero_ip(u8 *ip, u8 len) {
for (int i = 0; i < len; i++) {
Expand Down
67 changes: 36 additions & 31 deletions bpf/network_events_monitoring.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,26 +29,44 @@ static inline bool md_already_exists(u8 network_events[MAX_NETWORK_EVENTS][MAX_E
return false;
}

static inline int lookup_and_update_existing_flow_network_events(flow_id *id, u8 md_len,
u8 *user_cookie) {
u8 cookie[MAX_EVENT_MD];

bpf_probe_read(cookie, md_len, user_cookie);

flow_metrics *aggregate_flow = bpf_map_lookup_elem(&aggregated_flows, id);
if (aggregate_flow != NULL) {
u8 idx = aggregate_flow->network_events_idx;
aggregate_flow->end_mono_time_ts = bpf_ktime_get_ns();
// Needed to check length here again to keep JIT verifier happy
if (idx < MAX_NETWORK_EVENTS && md_len <= MAX_EVENT_MD) {
if (!md_already_exists(aggregate_flow->network_events, (u8 *)cookie)) {
__builtin_memcpy(aggregate_flow->network_events[idx], cookie, MAX_EVENT_MD);
aggregate_flow->network_events_idx = (idx + 1) % MAX_NETWORK_EVENTS;
}
return 0;
}
}
return -1;
}

static inline int trace_network_events(struct sk_buff *skb, struct rh_psample_metadata *md) {
u8 dscp = 0, protocol = 0, md_len = 0;
u16 family = 0, flags = 0;
u8 *user_cookie = NULL;
u8 cookie[MAX_EVENT_MD];
long ret = 0;
u64 len = 0;
flow_id id;

__builtin_memset(&id, 0, sizeof(id));
__builtin_memset(cookie, 0, sizeof(cookie));

md_len = BPF_CORE_READ(md, user_cookie_len);
user_cookie = (u8 *)BPF_CORE_READ(md, user_cookie);
if (md_len == 0 || md_len > MAX_EVENT_MD || user_cookie == NULL) {
return -1;
}

bpf_probe_read(cookie, md_len, user_cookie);

id.if_index = BPF_CORE_READ(md, in_ifindex);

len = BPF_CORE_READ(skb, len);
Expand Down Expand Up @@ -88,31 +106,10 @@ static inline int trace_network_events(struct sk_buff *skb, struct rh_psample_me

for (direction dir = INGRESS; dir < MAX_DIRECTION; dir++) {
id.direction = dir;
flow_metrics *aggregate_flow = bpf_map_lookup_elem(&aggregated_flows, &id);
if (aggregate_flow != NULL) {
u8 idx = aggregate_flow->network_events_idx;
aggregate_flow->end_mono_time_ts = bpf_ktime_get_ns();
// Needed to check length here again to keep JIT verifier happy
if (idx < MAX_NETWORK_EVENTS && md_len <= MAX_EVENT_MD) {
if (!md_already_exists(aggregate_flow->network_events, (u8 *)cookie)) {
__builtin_memcpy(aggregate_flow->network_events[idx], cookie, MAX_EVENT_MD);
aggregate_flow->network_events_idx = (idx + 1) % MAX_NETWORK_EVENTS;
}
ret = bpf_map_update_elem(&aggregated_flows, &id, aggregate_flow, BPF_ANY);
if (ret == 0) {
return 0;
}
} else {
return -1;
}
}
}

if (ret != 0) {
if (trace_messages) {
bpf_printk("error network events updating existing flow %d\n", ret);
ret = lookup_and_update_existing_flow_network_events(&id, md_len, user_cookie);
if (ret == 0) {
return ret;
}
return ret;
}

// there is no matching flows so lets create new one and add the network event metadata
Expand All @@ -128,9 +125,17 @@ static inline int trace_network_events(struct sk_buff *skb, struct rh_psample_me
};
bpf_probe_read(new_flow.network_events[0], md_len, user_cookie);
new_flow.network_events_idx++;
ret = bpf_map_update_elem(&aggregated_flows, &id, &new_flow, BPF_ANY);
if (trace_messages && ret != 0) {
bpf_printk("error network events creating new flow %d\n", ret);
ret = bpf_map_update_elem(&aggregated_flows, &id, &new_flow, BPF_NOEXIST);
if (ret != 0) {
if (trace_messages && ret != -EEXIST) {
bpf_printk("error network events creating new flow %d\n", ret);
}
if (ret == -EEXIST) {
ret = lookup_and_update_existing_flow_network_events(&id, md_len, user_cookie);
if (ret != 0 && trace_messages) {
bpf_printk("error network events failed to update an existing flow %d\n", ret);
}
}
}
return ret;
}
Expand Down
18 changes: 11 additions & 7 deletions bpf/pkt_drops.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,6 @@ static inline long pkt_drop_lookup_and_update_flow(flow_id *id, u8 state, u16 fl
aggregate_flow->pkt_drops.latest_state = state;
aggregate_flow->pkt_drops.latest_flags = flags;
aggregate_flow->pkt_drops.latest_drop_cause = reason;
long ret = bpf_map_update_elem(&aggregated_flows, id, aggregate_flow, BPF_EXIST);
if (trace_messages && ret != 0) {
bpf_printk("error packet drop updating flow %d\n", ret);
}
return 0;
}
return -1;
Expand Down Expand Up @@ -93,9 +89,17 @@ static inline int trace_pkt_drop(void *ctx, u8 state, struct sk_buff *skb,
.pkt_drops.latest_flags = flags,
.pkt_drops.latest_drop_cause = reason,
};
ret = bpf_map_update_elem(&aggregated_flows, &id, &new_flow, BPF_ANY);
if (trace_messages && ret != 0) {
bpf_printk("error packet drop creating new flow %d\n", ret);
ret = bpf_map_update_elem(&aggregated_flows, &id, &new_flow, BPF_NOEXIST);
if (ret != 0) {
if (trace_messages && ret != -EEXIST) {
bpf_printk("error packet drop creating new flow %d\n", ret);
}
if (ret == -EEXIST) {
ret = pkt_drop_lookup_and_update_flow(&id, state, flags, reason, len);
if (ret != 0 && trace_messages) {
bpf_printk("error packet drop updating an existing flow %d\n", ret);
}
}
}

return ret;
Expand Down
18 changes: 11 additions & 7 deletions bpf/rtt_tracker.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,6 @@ static inline int rtt_lookup_and_update_flow(flow_id *id, u16 flags, u64 rtt) {
if (aggregate_flow->flow_rtt < rtt) {
aggregate_flow->flow_rtt = rtt;
}
long ret = bpf_map_update_elem(&aggregated_flows, id, aggregate_flow, BPF_ANY);
if (trace_messages && ret != 0) {
bpf_printk("error rtt updating flow %d\n", ret);
}
return 0;
}
return -1;
Expand Down Expand Up @@ -87,9 +83,17 @@ static inline int calculate_flow_rtt_tcp(struct sock *sk, struct sk_buff *skb) {
.flow_rtt = rtt,
.dscp = dscp,
};
ret = bpf_map_update_elem(&aggregated_flows, &id, &new_flow, BPF_ANY);
if (trace_messages && ret != 0) {
bpf_printk("error rtt track creating flow %d\n", ret);
ret = bpf_map_update_elem(&aggregated_flows, &id, &new_flow, BPF_NOEXIST);
if (ret != 0) {
if (trace_messages && ret != -EEXIST) {
bpf_printk("error rtt track creating flow %d\n", ret);
}
if (ret == -EEXIST) {
ret = rtt_lookup_and_update_flow(&id, flags, rtt);
if (trace_messages && ret != 0) {
bpf_printk("error rtt track updating an existing flow %d\n", ret);
}
}
}

return 0;
Expand Down
4 changes: 4 additions & 0 deletions bpf/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@
#define DISCARD 1
#define SUBMIT 0

#define ENOENT 2
#define EEXIST 17
#define EINVAL 22

// Flags according to RFC 9293 & https://www.iana.org/assignments/ipfix/ipfix.xhtml
typedef enum tcp_flags_t {
FIN_FLAG = 0x01,
Expand Down
Binary file modified pkg/ebpf/bpf_arm64_bpfel.o
Binary file not shown.
Binary file modified pkg/ebpf/bpf_powerpc_bpfel.o
Binary file not shown.
Binary file modified pkg/ebpf/bpf_s390_bpfeb.o
Binary file not shown.
Binary file modified pkg/ebpf/bpf_x86_bpfel.o
Binary file not shown.
Loading