Skip to content

Commit

Permalink
chore: refactor jobs structure
Browse files Browse the repository at this point in the history
  • Loading branch information
exbotanical committed Nov 11, 2023
1 parent 45acf50 commit fc8b41d
Show file tree
Hide file tree
Showing 8 changed files with 165 additions and 158 deletions.
Binary file removed local
Binary file not shown.
3 changes: 2 additions & 1 deletion src/constants.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,5 +32,6 @@ extern char hostname[SMALL_BUFFER];

extern array_t *job_queue;

extern const char *job_status_names[];
extern const char *job_state_names[];

#endif /* CONSTANTS_H */
192 changes: 120 additions & 72 deletions src/job.c
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#include "job.h"

#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/wait.h>
Expand All @@ -13,6 +14,9 @@
#include "opt-constants.h"
#include "util.h"

pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;

// need to free
static char* create_uuid(void) {
char uuid[UUID_STR_LEN];
Expand All @@ -26,21 +30,39 @@ static char* create_uuid(void) {

// TODO: pass entry by value? Otherwise if entry is freed before we call fork,
// we're fucked
static Job* new_job(CronEntry* entry) {
static Job* new_cronjob(CronEntry* entry) {
Job* job = xmalloc(sizeof(Job));
job->ret = -1;
job->pid = -1;
job->mailer_pid = -1;
job->status = PENDING;
job->state = PENDING;
job->cmd = s_copy(entry->cmd); // TODO: free
job->ident = create_uuid();
job->type = CMD;

ht_record* r = ht_search(entry->parent->vars, MAILTO_ENVVAR);
job->mailto = s_copy(r ? r->value : entry->parent->uname);

return job;
}

static Job* new_mailjob(Job* og_job) {
Job* job = xmalloc(sizeof(Job));
job->ret = -1;
job->pid = -1;
job->state = PENDING;
job->ident = create_uuid();
job->type = MAIL;
job->mailto = og_job->mailto;

char mail_cmd[MED_BUFFER];
sprintf(mail_cmd, MAILCMD_FMT, MAILCMD_PATH, DAEMON_IDENT, hostname,
og_job->cmd, og_job->mailto);

job->cmd = s_copy(mail_cmd); // TODO: free

return job;
}

static bool await_job(pid_t pid, int* status) {
int r = waitpid(pid, status, WNOHANG);

Expand All @@ -59,8 +81,46 @@ static bool await_job(pid_t pid, int* status) {
return false;
}

void enqueue_job(CronEntry* entry) {
Job* job = new_job(entry);
static unsigned int temporary_mail_count = 0;

// TODO: [bash] <defunct>
// ps aux | grep './chronic' | grep -v 'grep' | awk '{print $2}' | wc -l
// ps aux | grep './chronic' | grep -v 'grep' | awk '{print $2}' | xargs kill
static void run_mailjob(Job* og_job) {
Job* job = new_mailjob(og_job);
array_push(job_queue, job);

printlogf("[job %s] going to run mail cmd: %s\n", job->ident, job->cmd);

if ((job->pid = fork()) == 0) {
setsid();
dup2(STDERR_FILENO, STDOUT_FILENO);

FILE* mail_pipe = popen(job->cmd, "w");

if (mail_pipe == NULL) {
perror("popen");
exit(EXIT_FAILURE);
}

fprintf(mail_pipe, fmt_str("This is the body of the email (num %d).\n",
++temporary_mail_count));

if (pclose(mail_pipe) == -1) {
perror("pclose");
exit(EXIT_FAILURE);
}
}
}

// void enqueue_job(CronEntry* entry) {
// Job* job = new_cronjob(entry);
// array_push(job_queue, job);
// }

void run_cronjob(CronEntry* entry) {
Job* job = new_cronjob(entry);
array_push(job_queue, job);

char* home = ht_get(entry->parent->vars, HOMEDIR_ENVVAR);
char* shell = ht_get(entry->parent->vars, SHELL_ENVVAR);
Expand All @@ -87,81 +147,69 @@ void enqueue_job(CronEntry* entry) {

printlogf("[job %s] New running job with pid %d\n", job->ident, job->pid);

job->status = RUNNING;
array_push(job_queue, job);
job->state = RUNNING;
}

unsigned int temporary_mail_count = 0;
// TODO: [bash] <defunct>
// execl("/bin/sh", "/bin/sh", "-c", job->cmd, NULL);
// ps aux | grep './chronic' | grep -v 'grep' | awk '{print $2}' | wc -l
// ps aux | grep './chronic' | grep -v 'grep' | awk '{print $2}' | xargs kill
void run_mailjob(Job* mailjob) {
mailjob->status = MAIL_RUNNING;
printlogf("[job %s] transition EXITED->MAIL_RUNNING\n", mailjob->ident);

char mail_cmd[MED_BUFFER];

sprintf(mail_cmd, MAILCMD_FMT, MAILCMD_PATH, DAEMON_IDENT, hostname,
mailjob->cmd, mailjob->mailto);

printlogf("going to run mailcmd: %s\n", mail_cmd);

if ((mailjob->mailer_pid = fork()) == 0) {
setsid();
dup2(STDERR_FILENO, STDOUT_FILENO);

FILE* mail_pipe = popen(mail_cmd, "w");

if (mail_pipe == NULL) {
perror("popen");
exit(EXIT_FAILURE);
}

fprintf(mail_pipe, fmt_str("This is the body of the email (num %d).\n",
++temporary_mail_count));

if (pclose(mail_pipe) == -1) {
perror("pclose");
exit(EXIT_FAILURE);
// ps aux | grep './chronic' | awk '{print $2}' | xargs kill
static void reap_job(Job* job) {
switch (job->state) {
case PENDING:
case EXITED:
default:
break;
case RUNNING: {
int status;
if (await_job(job->pid, &status)) {
printlogf("[job %s] transition RUNNING->EXITED (pid=%d, status=%d\n",
job->ident, job->pid, status);

job->ret = status;
job->state = EXITED;
job->pid = -1;

if (job->type == CMD) {
// run_mailjob(job);
}
}
}
}
}
// ps aux | grep './chronic' | awk '{print $2}' | xargs kill
void reap_job(Job* job) {
// This shouldn't happen, but just in case...
if (job->status == RESOLVED) {
printlogf(
"[job %s] Tried to reap job with status=%s. This is a "
"bug.\n",
job->ident, job_status_names[job->status]);
return;

static void* reap_routine(void* _arg) {
while (true) {
pthread_mutex_lock(&mutex);
pthread_cond_wait(&cond, &mutex);

printlogf("in reaper thread\n");
foreach (job_queue, i) {
Job* job = array_get(job_queue, i);
// TODO: null checks everywhere ^
reap_job(job);

// We need to be careful to only remove EXITED jobs, else
// we'll end up with zombley processes
if (job->state == EXITED) {
array_remove(job_queue, i);
}
}

pthread_mutex_unlock(&mutex);
}

int status;
return NULL;
}

if (job->status == MAIL_RUNNING) {
printlogf("[job %s] await (pid=%d, status=MAIL_RUNNING)\n", job->ident,
job->mailer_pid);
void init_reap_routine(void) {
pthread_t reaper_thread_id;
pthread_attr_t attr;
int rc = pthread_attr_init(&attr);
rc = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);

// Main job is done, await the mail job
if (await_job(job->mailer_pid, &status)) {
printlogf(
"[job %s] transition MAIL_RUNNING->RESOLVED (pid=%d, status=%d)\n",
job->ident, job->mailer_pid, status);
pthread_create(&reaper_thread_id, &attr, &reap_routine, NULL);
}

job->status = RESOLVED;
job->mailer_pid = -1;
// TODO: cleanup fns
}
} else {
if (await_job(job->pid, &status)) {
printlogf("[job %s] transition RUNNING->EXITED (pid=%d, status=%d)\n",
job->ident, job->pid, status);

job->ret = status;
job->status = EXITED;
job->pid = -1;
}
}
void signal_reap_routine(void) {
pthread_mutex_lock(&mutex);
pthread_cond_signal(&cond);
pthread_mutex_unlock(&mutex);
}
19 changes: 10 additions & 9 deletions src/job.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,23 @@
#include "cronentry.h"
#include "libhash/libhash.h"

typedef enum { PENDING, RUNNING, EXITED, MAIL_RUNNING, RESOLVED } JobStatus;
typedef enum { PENDING, RUNNING, EXITED } JobState;

typedef enum { CMD, MAIL } JobType;

typedef struct {
char *ident;
char *cmd;
pid_t pid;
pid_t mailer_pid;
JobStatus status;
int ret;
JobState state;
char *mailto;
char *ident;
JobType type;
int ret;
} Job;

void reap_job(Job *job);

void enqueue_job(CronEntry *entry);
void run_cronjob(CronEntry *entry);

void run_mailjob(Job *mailjob);
void init_reap_routine(void);
void signal_reap_routine(void);

#endif /* JOB_H */
19 changes: 4 additions & 15 deletions src/main.c
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,15 @@
#include "job.h"
#include "log.h"
#include "opt-constants.h"
#include "reaper.h"
#include "util.h"

pid_t daemon_pid;
char hostname[SMALL_BUFFER];

array_t* job_queue;
// array_t* mail_queue;

const char* job_status_names[] = {X(PENDING), X(RUNNING), X(EXITED),
X(RESOLVED), X(MAIL_RUNNING)};
const char* job_state_names[] = {X(PENDING), X(RUNNING), X(EXITED)};

// Desired interval in seconds between loop iterations
const short loop_interval = 60;
Expand Down Expand Up @@ -73,8 +72,7 @@ int main(int argc, char** argv) {

update_db(db, start_time, &usr, NULL);
// TODO: sys
pthread_t reaper_thread_id;
pthread_create(&reaper_thread_id, NULL, &reap_routine, NULL);
init_reap_routine();

while (true) {
printlogf("\n----------------\n");
Expand Down Expand Up @@ -107,21 +105,12 @@ int main(int argc, char** argv) {
foreach (ct->entries, i) {
CronEntry* entry = array_get(ct->entries, i);
if (entry->next == rounded_timestamp) {
enqueue_job(entry);
run_cronjob(entry);
}
}
}
}

foreach (job_queue, i) {
Job* job = array_get(job_queue, i);
if (job->status == EXITED) {
printlogf("\n[mailjob %s] about to run mailjob (status %s)\n\n",
job->ident, job_status_names[job->status]);
run_mailjob(job);
}
}

update_db(db, current_iter_time, &usr, NULL);
}

Expand Down
43 changes: 0 additions & 43 deletions src/reaper.c

This file was deleted.

11 changes: 0 additions & 11 deletions src/reaper.h

This file was deleted.

Loading

0 comments on commit fc8b41d

Please sign in to comment.