diff --git a/bpf/dns_tracker.h b/bpf/dns_tracker.h index d40e74d3f..c9db3a554 100644 --- a/bpf/dns_tracker.h +++ b/bpf/dns_tracker.h @@ -9,8 +9,6 @@ #define DNS_DEFAULT_PORT 53 #define DNS_QR_FLAG 0x8000 #define UDP_MAXMSG 512 -#define EINVAL 22 -#define ENOENT 2 struct dns_header { u16 id; @@ -91,8 +89,11 @@ static __always_inline int track_dns_packet(struct __sk_buff *skb, pkt_info *pkt if ((flags & DNS_QR_FLAG) == 0) { /* dns query */ fill_dns_id(pkt->id, &dns_req, dns_id, false); - if (bpf_map_lookup_elem(&dns_flows, &dns_req) == NULL) { - bpf_map_update_elem(&dns_flows, &dns_req, &ts, BPF_ANY); + ret = bpf_map_update_elem(&dns_flows, &dns_req, &ts, BPF_NOEXIST); + if (ret != 0) { + if (trace_messages && ret != -EEXIST) { + bpf_printk("error creating new dns entry %d\n", ret); + } } } else { /* dns response */ fill_dns_id(pkt->id, &dns_req, dns_id, true); diff --git a/bpf/flows.c b/bpf/flows.c index 7cf822014..0533f9ad2 100644 --- a/bpf/flows.c +++ b/bpf/flows.c @@ -51,6 +51,24 @@ */ #include "network_events_monitoring.h" +static inline void update_existing_flow(flow_metrics *aggregate_flow, pkt_info *pkt, int dns_errno, + u64 len) { + aggregate_flow->packets += 1; + aggregate_flow->bytes += len; + aggregate_flow->end_mono_time_ts = pkt->current_ts; + // it might happen that start_mono_time hasn't been set due to + // the way percpu hashmap deal with concurrent map entries + if (aggregate_flow->start_mono_time_ts == 0) { + aggregate_flow->start_mono_time_ts = pkt->current_ts; + } + aggregate_flow->flags |= pkt->flags; + aggregate_flow->dscp = pkt->dscp; + aggregate_flow->dns_record.id = pkt->dns_id; + aggregate_flow->dns_record.flags = pkt->dns_flags; + aggregate_flow->dns_record.latency = pkt->dns_latency; + aggregate_flow->dns_record.errno = dns_errno; +} + static inline int flow_monitor(struct __sk_buff *skb, u8 direction) { // If sampling is defined, will only parse 1 out of "sampling" flows if (sampling > 1 && (bpf_get_prandom_u32() % sampling) != 0) { @@ -70,6 +88,7 @@ static inline int flow_monitor(struct __sk_buff *skb, u8 direction) { void *data_end = (void *)(long)skb->data_end; void *data = (void *)(long)skb->data; struct ethhdr *eth = (struct ethhdr *)data; + u64 len = skb->len; if (fill_ethhdr(eth, data_end, &pkt) == DISCARD) { return TC_ACT_OK; @@ -93,33 +112,7 @@ static inline int flow_monitor(struct __sk_buff *skb, u8 direction) { // a spinlocked alternative version and use it selectively https://lwn.net/Articles/779120/ flow_metrics *aggregate_flow = (flow_metrics *)bpf_map_lookup_elem(&aggregated_flows, &id); if (aggregate_flow != NULL) { - aggregate_flow->packets += 1; - aggregate_flow->bytes += skb->len; - aggregate_flow->end_mono_time_ts = pkt.current_ts; - // it might happen that start_mono_time hasn't been set due to - // the way percpu hashmap deal with concurrent map entries - if (aggregate_flow->start_mono_time_ts == 0) { - aggregate_flow->start_mono_time_ts = pkt.current_ts; - } - aggregate_flow->flags |= pkt.flags; - aggregate_flow->dscp = pkt.dscp; - aggregate_flow->dns_record.id = pkt.dns_id; - aggregate_flow->dns_record.flags = pkt.dns_flags; - aggregate_flow->dns_record.latency = pkt.dns_latency; - aggregate_flow->dns_record.errno = dns_errno; - long ret = bpf_map_update_elem(&aggregated_flows, &id, aggregate_flow, BPF_ANY); - if (ret != 0) { - // usually error -16 (-EBUSY) is printed here. - // In this case, the flow is dropped, as submitting it to the ringbuffer would cause - // a duplicated UNION of flows (two different flows with partial aggregation of the same packets), - // which can't be deduplicated. - // other possible values https://chromium.googlesource.com/chromiumos/docs/+/master/constants/errnos.md - if (trace_messages) { - bpf_printk("error updating flow %d\n", ret); - } - // Update global counter for hashmap update errors - increase_counter(HASHMAP_FLOWS_DROPPED); - } + update_existing_flow(aggregate_flow, &pkt, dns_errno, len); } else { // Key does not exist in the map, and will need to create a new entry. u64 rtt = 0; @@ -128,7 +121,7 @@ static inline int flow_monitor(struct __sk_buff *skb, u8 direction) { } flow_metrics new_flow = { .packets = 1, - .bytes = skb->len, + .bytes = len, .start_mono_time_ts = pkt.current_ts, .end_mono_time_ts = pkt.current_ts, .flags = pkt.flags, @@ -140,31 +133,42 @@ static inline int flow_monitor(struct __sk_buff *skb, u8 direction) { .flow_rtt = rtt, }; - // even if we know that the entry is new, another CPU might be concurrently inserting a flow - // so we need to specify BPF_ANY - long ret = bpf_map_update_elem(&aggregated_flows, &id, &new_flow, BPF_ANY); + long ret = bpf_map_update_elem(&aggregated_flows, &id, &new_flow, BPF_NOEXIST); if (ret != 0) { - // usually error -16 (-EBUSY) or -7 (E2BIG) is printed here. - // In this case, we send the single-packet flow via ringbuffer as in the worst case we can have - // a repeated INTERSECTION of flows (different flows aggregating different packets), - // which can be re-aggregated at userpace. - // other possible values https://chromium.googlesource.com/chromiumos/docs/+/master/constants/errnos.md - if (trace_messages) { + if (trace_messages && ret != -EEXIST) { bpf_printk("error adding flow %d\n", ret); } - - new_flow.errno = -ret; - flow_record *record = - (flow_record *)bpf_ringbuf_reserve(&direct_flows, sizeof(flow_record), 0); - if (!record) { - if (trace_messages) { - bpf_printk("couldn't reserve space in the ringbuf. Dropping flow"); + if (ret == -EEXIST) { + flow_metrics *aggregate_flow = + (flow_metrics *)bpf_map_lookup_elem(&aggregated_flows, &id); + if (aggregate_flow != NULL) { + update_existing_flow(aggregate_flow, &pkt, dns_errno, len); + } else { + if (trace_messages) { + bpf_printk("failed to update an exising flow\n"); + } + // Update global counter for hashmap update errors + increase_counter(HASHMAP_FLOWS_DROPPED); + } + } else { + // usually error -16 (-EBUSY) or -7 (E2BIG) is printed here. + // In this case, we send the single-packet flow via ringbuffer as in the worst case we can have + // a repeated INTERSECTION of flows (different flows aggregating different packets), + // which can be re-aggregated at userpace. + // other possible values https://chromium.googlesource.com/chromiumos/docs/+/master/constants/errnos.md + new_flow.errno = -ret; + flow_record *record = + (flow_record *)bpf_ringbuf_reserve(&direct_flows, sizeof(flow_record), 0); + if (!record) { + if (trace_messages) { + bpf_printk("couldn't reserve space in the ringbuf. Dropping flow"); + } + return TC_ACT_OK; } - return TC_ACT_OK; + record->id = id; + record->metrics = new_flow; + bpf_ringbuf_submit(record, 0); } - record->id = id; - record->metrics = new_flow; - bpf_ringbuf_submit(record, 0); } } return TC_ACT_OK; diff --git a/bpf/flows_filter.h b/bpf/flows_filter.h index 3a541c6d5..e712a605a 100644 --- a/bpf/flows_filter.h +++ b/bpf/flows_filter.h @@ -7,13 +7,9 @@ #include "utils.h" -// remove the comment below to enable debug prints -//#define ENABLE_BPF_PRINTK -#ifdef ENABLE_BPF_PRINTK -#define BPF_PRINTK(fmt, args...) bpf_printk(fmt, ##args) -#else -#define BPF_PRINTK(fmt, args...) -#endif +#define BPF_PRINTK(fmt, args...) \ + if (trace_messages) \ + bpf_printk(fmt, ##args) static __always_inline int is_zero_ip(u8 *ip, u8 len) { for (int i = 0; i < len; i++) { diff --git a/bpf/network_events_monitoring.h b/bpf/network_events_monitoring.h index 732156c51..45f88bff3 100644 --- a/bpf/network_events_monitoring.h +++ b/bpf/network_events_monitoring.h @@ -29,17 +29,37 @@ static inline bool md_already_exists(u8 network_events[MAX_NETWORK_EVENTS][MAX_E return false; } +static inline int lookup_and_update_existing_flow_network_events(flow_id *id, u8 md_len, + u8 *user_cookie) { + u8 cookie[MAX_EVENT_MD]; + + bpf_probe_read(cookie, md_len, user_cookie); + + flow_metrics *aggregate_flow = bpf_map_lookup_elem(&aggregated_flows, id); + if (aggregate_flow != NULL) { + u8 idx = aggregate_flow->network_events_idx; + aggregate_flow->end_mono_time_ts = bpf_ktime_get_ns(); + // Needed to check length here again to keep JIT verifier happy + if (idx < MAX_NETWORK_EVENTS && md_len <= MAX_EVENT_MD) { + if (!md_already_exists(aggregate_flow->network_events, (u8 *)cookie)) { + __builtin_memcpy(aggregate_flow->network_events[idx], cookie, MAX_EVENT_MD); + aggregate_flow->network_events_idx = (idx + 1) % MAX_NETWORK_EVENTS; + } + return 0; + } + } + return -1; +} + static inline int trace_network_events(struct sk_buff *skb, struct rh_psample_metadata *md) { u8 dscp = 0, protocol = 0, md_len = 0; u16 family = 0, flags = 0; u8 *user_cookie = NULL; - u8 cookie[MAX_EVENT_MD]; long ret = 0; u64 len = 0; flow_id id; __builtin_memset(&id, 0, sizeof(id)); - __builtin_memset(cookie, 0, sizeof(cookie)); md_len = BPF_CORE_READ(md, user_cookie_len); user_cookie = (u8 *)BPF_CORE_READ(md, user_cookie); @@ -47,8 +67,6 @@ static inline int trace_network_events(struct sk_buff *skb, struct rh_psample_me return -1; } - bpf_probe_read(cookie, md_len, user_cookie); - id.if_index = BPF_CORE_READ(md, in_ifindex); len = BPF_CORE_READ(skb, len); @@ -88,31 +106,10 @@ static inline int trace_network_events(struct sk_buff *skb, struct rh_psample_me for (direction dir = INGRESS; dir < MAX_DIRECTION; dir++) { id.direction = dir; - flow_metrics *aggregate_flow = bpf_map_lookup_elem(&aggregated_flows, &id); - if (aggregate_flow != NULL) { - u8 idx = aggregate_flow->network_events_idx; - aggregate_flow->end_mono_time_ts = bpf_ktime_get_ns(); - // Needed to check length here again to keep JIT verifier happy - if (idx < MAX_NETWORK_EVENTS && md_len <= MAX_EVENT_MD) { - if (!md_already_exists(aggregate_flow->network_events, (u8 *)cookie)) { - __builtin_memcpy(aggregate_flow->network_events[idx], cookie, MAX_EVENT_MD); - aggregate_flow->network_events_idx = (idx + 1) % MAX_NETWORK_EVENTS; - } - ret = bpf_map_update_elem(&aggregated_flows, &id, aggregate_flow, BPF_ANY); - if (ret == 0) { - return 0; - } - } else { - return -1; - } - } - } - - if (ret != 0) { - if (trace_messages) { - bpf_printk("error network events updating existing flow %d\n", ret); + ret = lookup_and_update_existing_flow_network_events(&id, md_len, user_cookie); + if (ret == 0) { + return ret; } - return ret; } // there is no matching flows so lets create new one and add the network event metadata @@ -128,9 +125,17 @@ static inline int trace_network_events(struct sk_buff *skb, struct rh_psample_me }; bpf_probe_read(new_flow.network_events[0], md_len, user_cookie); new_flow.network_events_idx++; - ret = bpf_map_update_elem(&aggregated_flows, &id, &new_flow, BPF_ANY); - if (trace_messages && ret != 0) { - bpf_printk("error network events creating new flow %d\n", ret); + ret = bpf_map_update_elem(&aggregated_flows, &id, &new_flow, BPF_NOEXIST); + if (ret != 0) { + if (trace_messages && ret != -EEXIST) { + bpf_printk("error network events creating new flow %d\n", ret); + } + if (ret == -EEXIST) { + ret = lookup_and_update_existing_flow_network_events(&id, md_len, user_cookie); + if (ret != 0 && trace_messages) { + bpf_printk("error network events failed to update an existing flow %d\n", ret); + } + } } return ret; } diff --git a/bpf/pkt_drops.h b/bpf/pkt_drops.h index 0a65215df..7eea24a84 100644 --- a/bpf/pkt_drops.h +++ b/bpf/pkt_drops.h @@ -17,10 +17,6 @@ static inline long pkt_drop_lookup_and_update_flow(flow_id *id, u8 state, u16 fl aggregate_flow->pkt_drops.latest_state = state; aggregate_flow->pkt_drops.latest_flags = flags; aggregate_flow->pkt_drops.latest_drop_cause = reason; - long ret = bpf_map_update_elem(&aggregated_flows, id, aggregate_flow, BPF_EXIST); - if (trace_messages && ret != 0) { - bpf_printk("error packet drop updating flow %d\n", ret); - } return 0; } return -1; @@ -93,9 +89,17 @@ static inline int trace_pkt_drop(void *ctx, u8 state, struct sk_buff *skb, .pkt_drops.latest_flags = flags, .pkt_drops.latest_drop_cause = reason, }; - ret = bpf_map_update_elem(&aggregated_flows, &id, &new_flow, BPF_ANY); - if (trace_messages && ret != 0) { - bpf_printk("error packet drop creating new flow %d\n", ret); + ret = bpf_map_update_elem(&aggregated_flows, &id, &new_flow, BPF_NOEXIST); + if (ret != 0) { + if (trace_messages && ret != -EEXIST) { + bpf_printk("error packet drop creating new flow %d\n", ret); + } + if (ret == -EEXIST) { + ret = pkt_drop_lookup_and_update_flow(&id, state, flags, reason, len); + if (ret != 0 && trace_messages) { + bpf_printk("error packet drop updating an existing flow %d\n", ret); + } + } } return ret; diff --git a/bpf/rtt_tracker.h b/bpf/rtt_tracker.h index 0cef492cd..9e9822b2e 100644 --- a/bpf/rtt_tracker.h +++ b/bpf/rtt_tracker.h @@ -17,10 +17,6 @@ static inline int rtt_lookup_and_update_flow(flow_id *id, u16 flags, u64 rtt) { if (aggregate_flow->flow_rtt < rtt) { aggregate_flow->flow_rtt = rtt; } - long ret = bpf_map_update_elem(&aggregated_flows, id, aggregate_flow, BPF_ANY); - if (trace_messages && ret != 0) { - bpf_printk("error rtt updating flow %d\n", ret); - } return 0; } return -1; @@ -87,9 +83,17 @@ static inline int calculate_flow_rtt_tcp(struct sock *sk, struct sk_buff *skb) { .flow_rtt = rtt, .dscp = dscp, }; - ret = bpf_map_update_elem(&aggregated_flows, &id, &new_flow, BPF_ANY); - if (trace_messages && ret != 0) { - bpf_printk("error rtt track creating flow %d\n", ret); + ret = bpf_map_update_elem(&aggregated_flows, &id, &new_flow, BPF_NOEXIST); + if (ret != 0) { + if (trace_messages && ret != -EEXIST) { + bpf_printk("error rtt track creating flow %d\n", ret); + } + if (ret == -EEXIST) { + ret = rtt_lookup_and_update_flow(&id, flags, rtt); + if (trace_messages && ret != 0) { + bpf_printk("error rtt track updating an existing flow %d\n", ret); + } + } } return 0; diff --git a/bpf/types.h b/bpf/types.h index 6fdff183e..01a6141a9 100644 --- a/bpf/types.h +++ b/bpf/types.h @@ -9,6 +9,10 @@ #define DISCARD 1 #define SUBMIT 0 +#define ENOENT 2 +#define EEXIST 17 +#define EINVAL 22 + // Flags according to RFC 9293 & https://www.iana.org/assignments/ipfix/ipfix.xhtml typedef enum tcp_flags_t { FIN_FLAG = 0x01, diff --git a/pkg/ebpf/bpf_arm64_bpfel.o b/pkg/ebpf/bpf_arm64_bpfel.o index 983545e61..23786f134 100644 Binary files a/pkg/ebpf/bpf_arm64_bpfel.o and b/pkg/ebpf/bpf_arm64_bpfel.o differ diff --git a/pkg/ebpf/bpf_powerpc_bpfel.o b/pkg/ebpf/bpf_powerpc_bpfel.o index b2b546195..af1069434 100644 Binary files a/pkg/ebpf/bpf_powerpc_bpfel.o and b/pkg/ebpf/bpf_powerpc_bpfel.o differ diff --git a/pkg/ebpf/bpf_s390_bpfeb.o b/pkg/ebpf/bpf_s390_bpfeb.o index 89ab5f025..e1584583d 100644 Binary files a/pkg/ebpf/bpf_s390_bpfeb.o and b/pkg/ebpf/bpf_s390_bpfeb.o differ diff --git a/pkg/ebpf/bpf_x86_bpfel.o b/pkg/ebpf/bpf_x86_bpfel.o index 426917b9a..0a2744bc1 100644 Binary files a/pkg/ebpf/bpf_x86_bpfel.o and b/pkg/ebpf/bpf_x86_bpfel.o differ