Skip to content

Commit

Permalink
Merge branch 'refs/heads/main' into access_log_e2e
Browse files Browse the repository at this point in the history
# Conflicts:
#	CHANGES.md
  • Loading branch information
mrproliu committed Jun 4, 2024
2 parents 6a2888a + 446fa2e commit 88f332a
Show file tree
Hide file tree
Showing 7 changed files with 265 additions and 343 deletions.
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ Release Notes.
0.7.0
------------------
#### Features
* Upgrade LLVM to `18`.

#### Bug Fixes
* Fixed the issue where `conntrack` could not find the Reply IP in the access log module.
Expand Down
189 changes: 21 additions & 168 deletions bpf/accesslog/syscalls/transfer.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,13 @@

#include "api.h"
#include "socket_opts.h"
#include "socket_data.h"
#include "socket_reader.h"
#include "protocol_analyzer.h"
#include "../common/connection.h"
#include "../common/data_args.h"

#define SOCKET_UPLOAD_CHUNK_LIMIT 12


// openssl read or write
struct {
Expand Down Expand Up @@ -77,58 +78,6 @@ struct {
__uint(type, BPF_MAP_TYPE_PERF_EVENT_ARRAY);
} socket_detail_data_queue SEC(".maps");

struct socket_data_upload_event {
__u8 protocol;
__u8 have_reduce_after_chunk;
__u8 direction;
__u8 finished;
__u16 sequence;
__u16 data_len;
__u64 start_time;
__u64 end_time;
__u64 conid;
__u64 randomid;
__u64 data_id;
__u64 total_size;
char buffer[MAX_TRANSMIT_SOCKET_READ_LENGTH + 1];
};
struct {
__uint(type, BPF_MAP_TYPE_PERCPU_ARRAY);
__type(key, __u32);
__type(value, struct socket_data_upload_event);
__uint(max_entries, 1);
} socket_data_upload_event_per_cpu_map SEC(".maps");
struct {
__uint(type, BPF_MAP_TYPE_PERF_EVENT_ARRAY);
} socket_data_upload_event_queue SEC(".maps");

struct socket_data_sequence_t {
__u64 data_id;
__u16 sequence;
};
struct {
__uint(type, BPF_MAP_TYPE_LRU_HASH);
__uint(max_entries, 1000);
__type(key, __u64);
__type(value, struct socket_data_sequence_t);
} socket_data_sequence_generator SEC(".maps");
static __inline __u16 generate_socket_sequence(__u64 conid, __u64 data_id) {
struct socket_data_sequence_t *seq = bpf_map_lookup_elem(&socket_data_sequence_generator, &conid);
if (seq == NULL) {
struct socket_data_sequence_t data = {};
data.data_id = data_id;
data.sequence = 0;
bpf_map_update_elem(&socket_data_sequence_generator, &conid, &data, BPF_NOEXIST);
return 0;
}
if (seq->data_id != data_id) {
seq->data_id = data_id;
seq->sequence = 0;
} else {
seq->sequence++;
}
return seq->sequence;
}

static __inline void upload_socket_detail(void *ctx, __u64 conid, struct active_connection_t *connection, __u8 func_name, struct sock_data_args_t *data_args, bool ssl, __u64 end_nacs) {
// only send the original socket syscall(not ssl)
Expand Down Expand Up @@ -176,120 +125,6 @@ static __inline void upload_socket_detail(void *ctx, __u64 conid, struct active_
bpf_perf_event_output(ctx, &socket_detail_data_queue, BPF_F_CURRENT_CPU, detail, sizeof(*detail));
}

static __always_inline void __upload_socket_data_with_buffer(void *ctx, __u8 index, char* buf, size_t size, __u32 is_finished, __u8 have_reduce_after_chunk, struct socket_data_upload_event *event) {
event->sequence = index;
event->data_len = size;
event->finished = is_finished;
event->have_reduce_after_chunk = have_reduce_after_chunk;
if (size <= 0) {
return;
}
asm volatile("%[size] &= 0x7ff;\n" ::[size] "+r"(size) :);
bpf_probe_read(&event->buffer, size & 0x7ff, buf);

bpf_perf_event_output(ctx, &socket_data_upload_event_queue, BPF_F_CURRENT_CPU, event, sizeof(*event));
}

static __always_inline void upload_socket_data_buf(void *ctx, char* buf, ssize_t size, struct socket_data_upload_event *event, __u8 force_unfinished) {
ssize_t already_send = 0;
#pragma unroll
for (__u8 index = 0; index < SOCKET_UPLOAD_CHUNK_LIMIT; index++) {
// calculate bytes need to send
ssize_t remaining = size - already_send;
size_t need_send_in_chunk = 0;
__u8 have_reduce_after_chunk = 0;
if (remaining > MAX_TRANSMIT_SOCKET_READ_LENGTH) {
need_send_in_chunk = MAX_TRANSMIT_SOCKET_READ_LENGTH;
have_reduce_after_chunk = 1;
} else {
need_send_in_chunk = remaining;
}

__u32 is_finished = (need_send_in_chunk + already_send) >= size || index == (SOCKET_UPLOAD_CHUNK_LIMIT - 1) ? true : false;
__u8 sequence = index;
if (force_unfinished == 1 && need_send_in_chunk > 0) {
is_finished = 0;
sequence = generate_socket_sequence(event->conid, event->data_id);
}
__upload_socket_data_with_buffer(ctx, sequence, buf + already_send, need_send_in_chunk, is_finished, have_reduce_after_chunk, event);
already_send += need_send_in_chunk;

}
}

#define UPLOAD_PER_SOCKET_DATA_IOV() \
if (iov_index < iovlen) { \
struct iovec cur_iov; \
bpf_probe_read(&cur_iov, sizeof(cur_iov), &iov[iov_index]); \
ssize_t remaining = size - already_send; \
size_t need_send_in_chunk = remaining - cur_iov_sended; \
__u8 have_reduce_after_chunk = 0; \
if (cur_iov_sended + need_send_in_chunk > cur_iov.iov_len) { \
need_send_in_chunk = cur_iov.iov_len - cur_iov_sended; \
if (need_send_in_chunk > MAX_TRANSMIT_SOCKET_READ_LENGTH) { \
need_send_in_chunk = MAX_TRANSMIT_SOCKET_READ_LENGTH; \
have_reduce_after_chunk = 1; \
} else { \
iov_index++; \
cur_iov_sended = 0; \
} \
} else if (need_send_in_chunk > MAX_TRANSMIT_SOCKET_READ_LENGTH) { \
need_send_in_chunk = MAX_TRANSMIT_SOCKET_READ_LENGTH; \
have_reduce_after_chunk = 1; \
} \
__u32 is_finished = (need_send_in_chunk + already_send) >= size || loop_count == (SOCKET_UPLOAD_CHUNK_LIMIT - 1) ? true : false; \
__upload_socket_data_with_buffer(ctx, loop_count, cur_iov.iov_base + cur_iov_sended, need_send_in_chunk, is_finished, have_reduce_after_chunk, event); \
already_send += need_send_in_chunk; \
loop_count++; \
}

static __always_inline void upload_socket_data_iov(void *ctx, struct iovec* iov, const size_t iovlen, ssize_t size, struct socket_data_upload_event *event) {
ssize_t already_send = 0;
ssize_t cur_iov_sended = 0;
__u8 iov_index = 0;
__u8 loop_count = 0;

// each count is same with SOCKET_UPLOAD_CHUNK_LIMIT
UPLOAD_PER_SOCKET_DATA_IOV();
UPLOAD_PER_SOCKET_DATA_IOV();
UPLOAD_PER_SOCKET_DATA_IOV();
UPLOAD_PER_SOCKET_DATA_IOV();
UPLOAD_PER_SOCKET_DATA_IOV();
UPLOAD_PER_SOCKET_DATA_IOV();
UPLOAD_PER_SOCKET_DATA_IOV();
UPLOAD_PER_SOCKET_DATA_IOV();
}

static __inline void upload_socket_data(void *ctx, __u64 start_time, __u64 end_time, __u64 conid, struct active_connection_t *connection, struct sock_data_args_t *args, ssize_t bytes_count, __u32 existing_msg_type, __u32 data_direction, bool ssl) {
// must have protocol and ssl must same(plain)
// if the connection data is needs to skip upload, then skip
if (connection->protocol == CONNECTION_PROTOCOL_UNKNOWN || connection->ssl != ssl || connection->skip_data_upload == 1) {
return;
}
// generate event
__u32 kZero = 0;
struct socket_data_upload_event *event = bpf_map_lookup_elem(&socket_data_upload_event_per_cpu_map, &kZero);
if (event == NULL) {
return;
}

// basic data
event->start_time = start_time;
event->end_time = end_time;
event->protocol = connection->protocol;
event->direction = data_direction;
event->conid = conid;
event->randomid = connection->random_id;
event->total_size = bytes_count;
event->data_id = args->data_id;

if (args->buf != NULL) {
upload_socket_data_buf(ctx, args->buf, bytes_count, event, args->ssl_buffer_force_unfinished);
} else if (args->iovec != NULL) {
upload_socket_data_iov(ctx, args->iovec, args->iovlen, bytes_count, event);
}
}

static __always_inline void process_write_data(void *ctx, __u64 id, struct sock_data_args_t *args, ssize_t bytes_count,
__u32 data_direction, const bool vecs, __u8 func_name, bool ssl) {
__u64 curr_nacs = bpf_ktime_get_ns();
Expand Down Expand Up @@ -356,5 +191,23 @@ static __always_inline void process_write_data(void *ctx, __u64 id, struct sock_
upload_socket_detail(ctx, conid, conn, func_name, args, ssl, curr_nacs);

// upload the socket data if need
upload_socket_data(ctx, args->start_nacs, curr_nacs, conid, conn, args, bytes_count, msg_type, data_direction, ssl);
struct upload_data_args *upload_data_args = generate_socket_upload_args();
if (upload_data_args != NULL) {
upload_data_args->start_time = args->start_nacs;
upload_data_args->end_time = curr_nacs;
upload_data_args->con_id = conid;
upload_data_args->random_id = conn->random_id;
upload_data_args->socket_data_id = args->data_id;
upload_data_args->socket_data_iovec = args->iovec;
upload_data_args->socket_data_iovlen = args->iovlen;
upload_data_args->bytes_count = bytes_count;
upload_data_args->socket_data_buf = args->buf;
upload_data_args->data_direction = data_direction;
upload_data_args->connection_protocol = conn->protocol;
upload_data_args->connection_ssl = conn->ssl;
upload_data_args->socket_ssl_buffer_force_unfinished = args->ssl_buffer_force_unfinished;
upload_data_args->connection_skip_data_upload = conn->skip_data_upload;
upload_data_args->socket_data_ssl = ssl;
upload_socket_data(ctx, upload_data_args);
};
}
Loading

0 comments on commit 88f332a

Please sign in to comment.