Skip to content

Commit

Permalink
[#178] Remove socket descriptor in sub-processes for killed connections
Browse files Browse the repository at this point in the history
  • Loading branch information
jesperpedersen committed Oct 1, 2021
1 parent eb87d5c commit e33ba21
Show file tree
Hide file tree
Showing 7 changed files with 391 additions and 23 deletions.
11 changes: 11 additions & 0 deletions src/include/management.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ extern "C" {
#define MANAGEMENT_CLIENT_FD 16
#define MANAGEMENT_SWITCH_TO 17
#define MANAGEMENT_RELOAD 18
#define MANAGEMENT_REMOVE_FD 19

/**
* Read the management header
Expand Down Expand Up @@ -294,6 +295,16 @@ pgagroal_management_switch_to(SSL* ssl, int socket, char* server);
int
pgagroal_management_reload(SSL* ssl, int socket);

/**
* Management operation: Remove socket descriptor
* @param slot The slot
* @param socket The socket
* @param pid The pid
* @return 0 upon success, otherwise 1
*/
int
pgagroal_management_remove_fd(int32_t slot, int socket, pid_t pid);

#ifdef __cplusplus
}
#endif
Expand Down
53 changes: 53 additions & 0 deletions src/libpgagroal/management.c
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ pgagroal_management_read_payload(int socket, signed char id, int* payload_i, cha
case MANAGEMENT_FLUSH:
case MANAGEMENT_KILL_CONNECTION:
case MANAGEMENT_CLIENT_DONE:
case MANAGEMENT_REMOVE_FD:
if (read_complete(NULL, socket, &buf4[0], sizeof(buf4)))
{
goto error;
Expand Down Expand Up @@ -1155,6 +1156,58 @@ pgagroal_management_reload(SSL* ssl, int fd)
return 1;
}

int
pgagroal_management_remove_fd(int32_t slot, int socket, pid_t pid)
{
char p[MISC_LENGTH];
int fd;
char buf[4];
struct configuration* config;

config = (struct configuration*)shmem;

if (atomic_load(&config->states[slot]) == STATE_NOTINIT)
{
return 0;
}

memset(&p, 0, sizeof(p));
snprintf(&p[0], sizeof(p), ".s.%d", pid);

if (pgagroal_connect_unix_socket(config->unix_socket_dir, &p[0], &fd))
{
pgagroal_log_debug("pgagroal_management_remove_fd: slot %d state %d database %s user %s socket %d pid %d connect: %d",
slot, atomic_load(&config->states[slot]),
config->connections[slot].database, config->connections[slot].username, socket, pid, fd);
errno = 0;
goto error;
}

if (write_header(NULL, fd, MANAGEMENT_REMOVE_FD, slot))
{
pgagroal_log_warn("pgagroal_management_remove_fd: write: %d", fd);
errno = 0;
goto error;
}

pgagroal_write_int32(&buf, socket);
if (write_complete(NULL, fd, &buf, sizeof(buf)))
{
pgagroal_log_warn("pgagroal_management_remove_fd: write: %d %s", fd, strerror(errno));
errno = 0;
goto error;
}

pgagroal_disconnect(fd);

return 0;

error:
pgagroal_disconnect(fd);

return 1;
}

static int
read_complete(SSL* ssl, int socket, void* buf, size_t size)
{
Expand Down
146 changes: 142 additions & 4 deletions src/libpgagroal/pipeline_perf.c
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,20 @@
/* pgagroal */
#include <pgagroal.h>
#include <logging.h>
#include <management.h>
#include <message.h>
#include <network.h>
#include <pipeline.h>
#include <worker.h>

/* system */
#include <errno.h>
#include <ev.h>
#include <stdlib.h>
#include <unistd.h>
#include <netinet/in.h>
#include <sys/types.h>
#include <sys/socket.h>

static int performance_initialize(void*, void**, size_t*);
static void performance_start(struct ev_loop *loop, struct worker_io*);
Expand All @@ -61,6 +67,16 @@ struct pipeline performance_pipeline(void)
return pipeline;
}

static int unix_socket = -1;
static struct ev_io io_mgt;

static int fds[MAX_NUMBER_OF_CONNECTIONS];
static bool news[MAX_NUMBER_OF_CONNECTIONS];

static void start_mgt(struct ev_loop *loop);
static void shutdown_mgt(struct ev_loop *loop);
static void accept_cb(struct ev_loop *loop, struct ev_io *watcher, int revents);

static int
performance_initialize(void* shmem, void** pipeline_shmem, size_t* pipeline_shmem_size)
{
Expand All @@ -70,11 +86,42 @@ performance_initialize(void* shmem, void** pipeline_shmem, size_t* pipeline_shme
static void
performance_start(struct ev_loop *loop, struct worker_io* w)
{
char p[MISC_LENGTH];
struct configuration* config;

config = (struct configuration*)shmem;

for (int i = 0; i < config->max_connections; i++)
{
fds[i] = config->connections[i].fd;
news[i] = config->connections[i].new;
}

memset(&p, 0, sizeof(p));
snprintf(&p[0], sizeof(p), ".s.%d", getpid());

if (pgagroal_bind_unix_socket(config->unix_socket_dir, &p[0], &unix_socket))
{
pgagroal_log_fatal("pgagroal: Could not bind to %s/%s", config->unix_socket_dir, &p[0]);
goto error;
}

start_mgt(loop);

return;

error:

exit_code = WORKER_FAILURE;
running = 0;
ev_break(loop, EVBREAK_ALL);
return;
}

static void
performance_stop(struct ev_loop *loop, struct worker_io* w)
{
shutdown_mgt(loop);
}

static void
Expand All @@ -93,6 +140,7 @@ performance_client(struct ev_loop *loop, struct ev_io *watcher, int revents)
int status = MESSAGE_STATUS_ERROR;
struct worker_io* wi = NULL;
struct message* msg = NULL;
struct configuration* config = NULL;

wi = (struct worker_io*)watcher;

Expand Down Expand Up @@ -127,7 +175,10 @@ performance_client(struct ev_loop *loop, struct ev_io *watcher, int revents)
return;

client_error:
pgagroal_log_warn("[C] Client error: %s (socket %d status %d)", strerror(errno), wi->client_fd, status);
config = (struct configuration*)shmem;
pgagroal_log_warn("[C] Client error (slot %d database %s user %s): %s (socket %d status %d)",
wi->slot, config->connections[wi->slot].database, config->connections[wi->slot].username,
strerror(errno), wi->client_fd, status);
pgagroal_log_message(msg);
errno = 0;

Expand All @@ -137,7 +188,10 @@ performance_client(struct ev_loop *loop, struct ev_io *watcher, int revents)
return;

server_error:
pgagroal_log_warn("[C] Server error: %s (socket %d status %d)", strerror(errno), wi->server_fd, status);
config = (struct configuration*)shmem;
pgagroal_log_warn("[C] Server error (slot %d database %s user %s): %s (socket %d status %d)",
wi->slot, config->connections[wi->slot].database, config->connections[wi->slot].username,
strerror(errno), wi->server_fd, status);
pgagroal_log_message(msg);
errno = 0;

Expand All @@ -154,6 +208,7 @@ performance_server(struct ev_loop *loop, struct ev_io *watcher, int revents)
bool fatal = false;
struct worker_io* wi = NULL;
struct message* msg = NULL;
struct configuration* config = NULL;

wi = (struct worker_io*)watcher;

Expand Down Expand Up @@ -194,7 +249,10 @@ performance_server(struct ev_loop *loop, struct ev_io *watcher, int revents)
return;

client_error:
pgagroal_log_warn("[S] Client error: %s (socket %d status %d)", strerror(errno), wi->client_fd, status);
config = (struct configuration*)shmem;
pgagroal_log_warn("[S] Client error (slot %d database %s user %s): %s (socket %d status %d)",
wi->slot, config->connections[wi->slot].database, config->connections[wi->slot].username,
strerror(errno), wi->client_fd, status);
pgagroal_log_message(msg);
errno = 0;

Expand All @@ -204,7 +262,10 @@ performance_server(struct ev_loop *loop, struct ev_io *watcher, int revents)
return;

server_error:
pgagroal_log_warn("[S] Server error: %s (socket %d status %d)", strerror(errno), wi->server_fd, status);
config = (struct configuration*)shmem;
pgagroal_log_warn("[S] Server error (slot %d database %s user %s): %s (socket %d status %d)",
wi->slot, config->connections[wi->slot].database, config->connections[wi->slot].username,
strerror(errno), wi->server_fd, status);
pgagroal_log_message(msg);
errno = 0;

Expand All @@ -213,3 +274,80 @@ performance_server(struct ev_loop *loop, struct ev_io *watcher, int revents)
ev_break(loop, EVBREAK_ALL);
return;
}

static void
start_mgt(struct ev_loop *loop)
{
memset(&io_mgt, 0, sizeof(struct ev_io));
ev_io_init(&io_mgt, accept_cb, unix_socket, EV_READ);
ev_io_start(loop, &io_mgt);
}

static void
shutdown_mgt(struct ev_loop* loop)
{
char p[MISC_LENGTH];
struct configuration* config = NULL;

config = (struct configuration*)shmem;

memset(&p, 0, sizeof(p));
snprintf(&p[0], sizeof(p), ".s.%d", getpid());

ev_io_stop(loop, &io_mgt);
pgagroal_disconnect(unix_socket);
errno = 0;
pgagroal_remove_unix_socket(config->unix_socket_dir, &p[0]);
errno = 0;
}

static void
accept_cb(struct ev_loop *loop, struct ev_io *watcher, int revents)
{
struct sockaddr_in client_addr;
socklen_t client_addr_length;
int client_fd;
signed char id;
int32_t slot;
int payload_i;
char* payload_s = NULL;

pgagroal_log_trace("accept_cb: sockfd ready (%d)", revents);

if (EV_ERROR & revents)
{
pgagroal_log_debug("accept_cb: invalid event: %s", strerror(errno));
errno = 0;
return;
}

client_addr_length = sizeof(client_addr);
client_fd = accept(watcher->fd, (struct sockaddr *)&client_addr, &client_addr_length);
if (client_fd == -1)
{
pgagroal_log_debug("accept: %s (%d)", strerror(errno), watcher->fd);
errno = 0;
return;
}

/* Process internal management request -- f.ex. returning a file descriptor to the pool */
pgagroal_management_read_header(client_fd, &id, &slot);
pgagroal_management_read_payload(client_fd, id, &payload_i, &payload_s);

switch (id)
{
case MANAGEMENT_REMOVE_FD:
pgagroal_log_debug("pgagroal: Management remove file descriptor: Slot %d FD %d", slot, payload_i);
if (fds[slot] == payload_i && !news[slot])
{
pgagroal_disconnect(payload_i);
fds[slot] = 0;
}
break;
default:
pgagroal_log_debug("pgagroal: Unsupported management id: %d", id);
break;
}

pgagroal_disconnect(client_fd);
}
Loading

0 comments on commit e33ba21

Please sign in to comment.