Skip to content

Commit

Permalink
Merge branch 'pr-466' into global-map
Browse files Browse the repository at this point in the history
  • Loading branch information
jotak committed Nov 29, 2024
2 parents e4a4fc3 + 58cbf78 commit bf6ad1e
Show file tree
Hide file tree
Showing 11 changed files with 124 additions and 90 deletions.
9 changes: 5 additions & 4 deletions bpf/dns_tracker.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@
#define DNS_DEFAULT_PORT 53
#define DNS_QR_FLAG 0x8000
#define UDP_MAXMSG 512
#define EINVAL 22
#define ENOENT 2

struct dns_header {
u16 id;
Expand Down Expand Up @@ -91,8 +89,11 @@ static __always_inline int track_dns_packet(struct __sk_buff *skb, pkt_info *pkt

if ((flags & DNS_QR_FLAG) == 0) { /* dns query */
fill_dns_id(pkt->id, &dns_req, dns_id, false);
if (bpf_map_lookup_elem(&dns_flows, &dns_req) == NULL) {
bpf_map_update_elem(&dns_flows, &dns_req, &ts, BPF_ANY);
ret = bpf_map_update_elem(&dns_flows, &dns_req, &ts, BPF_NOEXIST);
if (ret != 0) {
if (trace_messages && ret != -EEXIST) {
bpf_printk("error creating new dns entry %d\n", ret);
}
}
} else { /* dns response */
fill_dns_id(pkt->id, &dns_req, dns_id, true);
Expand Down
89 changes: 53 additions & 36 deletions bpf/flows.c
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,26 @@
*/
#include "network_events_monitoring.h"

static inline void update_existing_flow(flow_metrics *aggregate_flow, pkt_info *pkt, u64 len,
int dns_errno) {
bpf_spin_lock(&aggregate_flow->lock);
aggregate_flow->packets += 1;
aggregate_flow->bytes += len;
aggregate_flow->end_mono_time_ts = pkt->current_ts;
// it might happen that start_mono_time hasn't been set due to
// the way percpu hashmap deal with concurrent map entries
if (aggregate_flow->start_mono_time_ts == 0) {
aggregate_flow->start_mono_time_ts = pkt->current_ts;
}
aggregate_flow->flags |= pkt->flags;
aggregate_flow->dscp = pkt->dscp;
aggregate_flow->dns_record.id = pkt->dns_id;
aggregate_flow->dns_record.flags = pkt->dns_flags;
aggregate_flow->dns_record.latency = pkt->dns_latency;
aggregate_flow->dns_record.errno = dns_errno;
bpf_spin_unlock(&aggregate_flow->lock);
}

static inline int flow_monitor(struct __sk_buff *skb, u8 direction) {
// If sampling is defined, will only parse 1 out of "sampling" flows
if (sampling > 1 && (bpf_get_prandom_u32() % sampling) != 0) {
Expand All @@ -70,6 +90,7 @@ static inline int flow_monitor(struct __sk_buff *skb, u8 direction) {
void *data_end = (void *)(long)skb->data_end;
void *data = (void *)(long)skb->data;
struct ethhdr *eth = (struct ethhdr *)data;
u64 len = skb->len;

if (fill_ethhdr(eth, data_end, &pkt) == DISCARD) {
return TC_ACT_OK;
Expand All @@ -93,27 +114,12 @@ static inline int flow_monitor(struct __sk_buff *skb, u8 direction) {
// a spinlocked alternative version and use it selectively https://lwn.net/Articles/779120/
flow_metrics *aggregate_flow = (flow_metrics *)bpf_map_lookup_elem(&aggregated_flows, &id);
if (aggregate_flow != NULL) {
bpf_spin_lock(&aggregate_flow->lock);
aggregate_flow->packets += 1;
aggregate_flow->bytes += skb->len;
aggregate_flow->end_mono_time_ts = pkt.current_ts;
// it might happen that start_mono_time hasn't been set due to
// the way percpu hashmap deal with concurrent map entries
if (aggregate_flow->start_mono_time_ts == 0) {
aggregate_flow->start_mono_time_ts = pkt.current_ts;
}
aggregate_flow->flags |= pkt.flags;
aggregate_flow->dscp = pkt.dscp;
aggregate_flow->dns_record.id = pkt.dns_id;
aggregate_flow->dns_record.flags = pkt.dns_flags;
aggregate_flow->dns_record.latency = pkt.dns_latency;
aggregate_flow->dns_record.errno = dns_errno;
bpf_spin_unlock(&aggregate_flow->lock);
update_existing_flow(aggregate_flow, &pkt, dns_errno, len);
} else {
// Key does not exist in the map, and will need to create a new entry.
flow_metrics new_flow = {
.packets = 1,
.bytes = skb->len,
.bytes = len,
.eth_protocol = pkt.eth_protocol,
.start_mono_time_ts = pkt.current_ts,
.end_mono_time_ts = pkt.current_ts,
Expand All @@ -127,31 +133,42 @@ static inline int flow_monitor(struct __sk_buff *skb, u8 direction) {
__builtin_memcpy(new_flow.dst_mac, pkt.dst_mac, ETH_ALEN);
__builtin_memcpy(new_flow.src_mac, pkt.src_mac, ETH_ALEN);

// even if we know that the entry is new, another CPU might be concurrently inserting a flow
// so we need to specify BPF_ANY
long ret = bpf_map_update_elem(&aggregated_flows, &id, &new_flow, BPF_NOEXIST);
if (ret != 0) {
// usually error -16 (-EBUSY) or -7 (E2BIG) is printed here.
// In this case, we send the single-packet flow via ringbuffer as in the worst case we can have
// a repeated INTERSECTION of flows (different flows aggregating different packets),
// which can be re-aggregated at userpace.
// other possible values https://chromium.googlesource.com/chromiumos/docs/+/master/constants/errnos.md
if (trace_messages) {
if (trace_messages && ret != -EEXIST) {
bpf_printk("error adding flow %d\n", ret);
}

new_flow.errno = -ret;
flow_record *record =
(flow_record *)bpf_ringbuf_reserve(&direct_flows, sizeof(flow_record), 0);
if (!record) {
if (trace_messages) {
bpf_printk("couldn't reserve space in the ringbuf. Dropping flow");
if (ret == -EEXIST) {
flow_metrics *aggregate_flow =
(flow_metrics *)bpf_map_lookup_elem(&aggregated_flows, &id);
if (aggregate_flow != NULL) {
update_existing_flow(aggregate_flow, &pkt, dns_errno, len);
} else {
if (trace_messages) {
bpf_printk("failed to update an exising flow\n");
}
// Update global counter for hashmap update errors
increase_counter(HASHMAP_FLOWS_DROPPED);
}
} else {
// usually error -16 (-EBUSY) or -7 (E2BIG) is printed here.
// In this case, we send the single-packet flow via ringbuffer as in the worst case we can have
// a repeated INTERSECTION of flows (different flows aggregating different packets),
// which can be re-aggregated at userpace.
// other possible values https://chromium.googlesource.com/chromiumos/docs/+/master/constants/errnos.md
new_flow.errno = -ret;
flow_record *record =
(flow_record *)bpf_ringbuf_reserve(&direct_flows, sizeof(flow_record), 0);
if (!record) {
if (trace_messages) {
bpf_printk("couldn't reserve space in the ringbuf. Dropping flow");
}
return TC_ACT_OK;
}
return TC_ACT_OK;
record->id = id;
record->metrics = new_flow;
bpf_ringbuf_submit(record, 0);
}
record->id = id;
record->metrics = new_flow;
bpf_ringbuf_submit(record, 0);
}
}
return TC_ACT_OK;
Expand Down
10 changes: 3 additions & 7 deletions bpf/flows_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,9 @@

#include "utils.h"

// remove the comment below to enable debug prints
//#define ENABLE_BPF_PRINTK
#ifdef ENABLE_BPF_PRINTK
#define BPF_PRINTK(fmt, args...) bpf_printk(fmt, ##args)
#else
#define BPF_PRINTK(fmt, args...)
#endif
#define BPF_PRINTK(fmt, args...) \
if (trace_messages) \
bpf_printk(fmt, ##args)

static __always_inline int is_zero_ip(u8 *ip, u8 len) {
for (int i = 0; i < len; i++) {
Expand Down
66 changes: 37 additions & 29 deletions bpf/network_events_monitoring.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,18 +29,37 @@ static inline bool md_already_exists(u8 network_events[MAX_NETWORK_EVENTS][MAX_E
return false;
}

static inline int lookup_and_update_existing_flow_network_events(flow_id *id, u8 md_len,
u8 *user_cookie) {
u8 cookie[MAX_EVENT_MD];

bpf_probe_read(cookie, md_len, user_cookie);

additional_metrics *aggregate_flow = bpf_map_lookup_elem(&additional_flow_metrics, id);
if (aggregate_flow != NULL) {
u8 idx = aggregate_flow->network_events_idx;
// Needed to check length here again to keep JIT verifier happy
if (idx < MAX_NETWORK_EVENTS && md_len <= MAX_EVENT_MD) {
if (!md_already_exists(aggregate_flow->network_events, (u8 *)cookie)) {
__builtin_memcpy(aggregate_flow->network_events[idx], cookie, MAX_EVENT_MD);
aggregate_flow->network_events_idx = (idx + 1) % MAX_NETWORK_EVENTS;
}
return 0;
}
}
return -1;
}

static inline int trace_network_events(struct sk_buff *skb, struct rh_psample_metadata *md) {
u8 dscp = 0, protocol = 0, md_len = 0;
u16 family = 0;
u8 *user_cookie = NULL;
u8 cookie[MAX_EVENT_MD];
long ret = 0;
flow_id id;
pkt_info pkt;

__builtin_memset(&id, 0, sizeof(id));
__builtin_memset(&pkt, 0, sizeof(pkt));
__builtin_memset(cookie, 0, sizeof(cookie));
pkt.id = &id;

md_len = BPF_CORE_READ(md, user_cookie_len);
Expand All @@ -49,8 +68,6 @@ static inline int trace_network_events(struct sk_buff *skb, struct rh_psample_me
return -1;
}

bpf_probe_read(cookie, md_len, user_cookie);

// read L2 info
core_fill_in_l2(skb, &pkt, &family);

Expand Down Expand Up @@ -84,29 +101,12 @@ static inline int trace_network_events(struct sk_buff *skb, struct rh_psample_me
return 0;
}

additional_metrics *aggregate_flow = bpf_map_lookup_elem(&additional_flow_metrics, &id);
if (aggregate_flow != NULL) {
u8 idx = aggregate_flow->network_events_idx;
// Needed to check length here again to keep JIT verifier happy
if (idx < MAX_NETWORK_EVENTS && md_len <= MAX_EVENT_MD) {
if (!md_already_exists(aggregate_flow->network_events, (u8 *)cookie)) {
__builtin_memcpy(aggregate_flow->network_events[idx], cookie, MAX_EVENT_MD);
aggregate_flow->network_events_idx = (idx + 1) % MAX_NETWORK_EVENTS;
}
ret = bpf_map_update_elem(&additional_flow_metrics, &id, aggregate_flow, BPF_ANY);
if (ret == 0) {
return 0;
}
} else {
return -1;
}
}

if (ret != 0) {
if (trace_messages) {
bpf_printk("error network events updating existing flow %d\n", ret);
for (direction dir = INGRESS; dir < MAX_DIRECTION; dir++) {
id.direction = dir;
ret = lookup_and_update_existing_flow_network_events(&id, md_len, user_cookie);
if (ret == 0) {
return ret;
}
return ret;
}

// there is no matching flows so lets create new one and add the network event metadata
Expand All @@ -115,9 +115,17 @@ static inline int trace_network_events(struct sk_buff *skb, struct rh_psample_me
};
bpf_probe_read(new_flow.network_events[0], md_len, user_cookie);
new_flow.network_events_idx++;
ret = bpf_map_update_elem(&additional_flow_metrics, &id, &new_flow, BPF_ANY);
if (trace_messages && ret != 0) {
bpf_printk("error network events creating new flow %d\n", ret);
ret = bpf_map_update_elem(&additional_flow_metrics, &id, &new_flow, BPF_NOEXIST);
if (ret != 0) {
if (trace_messages && ret != -EEXIST) {
bpf_printk("error network events creating new flow %d\n", ret);
}
if (ret == -EEXIST) {
ret = lookup_and_update_existing_flow_network_events(&id, md_len, user_cookie);
if (ret != 0 && trace_messages) {
bpf_printk("error network events failed to update an existing flow %d\n", ret);
}
}
}
return ret;
}
Expand Down
18 changes: 11 additions & 7 deletions bpf/pkt_drops.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,6 @@ static inline long pkt_drop_lookup_and_update_flow(flow_id *id, u8 state, u16 fl
aggregate_flow->pkt_drops.latest_state = state;
aggregate_flow->pkt_drops.latest_flags = flags;
aggregate_flow->pkt_drops.latest_drop_cause = reason;
long ret = bpf_map_update_elem(&additional_flow_metrics, id, aggregate_flow, BPF_EXIST);
if (trace_messages && ret != 0) {
bpf_printk("error packet drop updating flow %d\n", ret);
}
return 0;
}
return -1;
Expand Down Expand Up @@ -86,9 +82,17 @@ static inline int trace_pkt_drop(void *ctx, u8 state, struct sk_buff *skb,
.pkt_drops.latest_flags = flags,
.pkt_drops.latest_drop_cause = reason,
};
ret = bpf_map_update_elem(&additional_flow_metrics, &id, &new_flow, BPF_ANY);
if (trace_messages && ret != 0) {
bpf_printk("error packet drop creating new flow %d\n", ret);
ret = bpf_map_update_elem(&additional_flow_metrics, &id, &new_flow, BPF_NOEXIST);
if (ret != 0) {
if (trace_messages && ret != -EEXIST) {
bpf_printk("error packet drop creating new flow %d\n", ret);
}
if (ret == -EEXIST) {
ret = pkt_drop_lookup_and_update_flow(&id, state, flags, reason, len);
if (ret != 0 && trace_messages) {
bpf_printk("error packet drop updating an existing flow %d\n", ret);
}
}
}

return ret;
Expand Down
18 changes: 11 additions & 7 deletions bpf/rtt_tracker.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,6 @@ static inline int rtt_lookup_and_update_flow(flow_id *id, u64 rtt) {
if (aggregate_flow->flow_rtt < rtt) {
aggregate_flow->flow_rtt = rtt;
}
long ret = bpf_map_update_elem(&additional_flow_metrics, id, aggregate_flow, BPF_ANY);
if (trace_messages && ret != 0) {
bpf_printk("error rtt updating flow %d\n", ret);
}
return 0;
}
return -1;
Expand Down Expand Up @@ -79,9 +75,17 @@ static inline int calculate_flow_rtt_tcp(struct sock *sk, struct sk_buff *skb) {
additional_metrics new_flow = {
.flow_rtt = rtt,
};
ret = bpf_map_update_elem(&additional_flow_metrics, &id, &new_flow, BPF_ANY);
if (trace_messages && ret != 0) {
bpf_printk("error rtt track creating flow %d\n", ret);
ret = bpf_map_update_elem(&additional_flow_metrics, &id, &new_flow, BPF_NOEXIST);
if (ret != 0) {
if (trace_messages && ret != -EEXIST) {
bpf_printk("error rtt track creating flow %d\n", ret);
}
if (ret == -EEXIST) {
ret = rtt_lookup_and_update_flow(&id, rtt);
if (trace_messages && ret != 0) {
bpf_printk("error rtt track updating an existing flow %d\n", ret);
}
}
}

return 0;
Expand Down
4 changes: 4 additions & 0 deletions bpf/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@
#define DISCARD 1
#define SUBMIT 0

#define ENOENT 2
#define EEXIST 17
#define EINVAL 22

// Flags according to RFC 9293 & https://www.iana.org/assignments/ipfix/ipfix.xhtml
typedef enum tcp_flags_t {
FIN_FLAG = 0x01,
Expand Down
Binary file modified pkg/ebpf/bpf_arm64_bpfel.o
Binary file not shown.
Binary file modified pkg/ebpf/bpf_powerpc_bpfel.o
Binary file not shown.
Binary file modified pkg/ebpf/bpf_s390_bpfeb.o
Binary file not shown.
Binary file modified pkg/ebpf/bpf_x86_bpfel.o
Binary file not shown.

0 comments on commit bf6ad1e

Please sign in to comment.