Skip to content

Commit

Permalink
rg_system: Added message queue to rg_task
Browse files Browse the repository at this point in the history
Having a built-in messaging system for all tasks is very convenient.

Tasks are free to ignore it or to use it to receive work units. There is no longer a need for separate queues!

This also allows tasks to receive and react to some standard messages like STOP/SHUTDOWN.

An additional benefit is that this will be very easy to make it work in SDL2 compared to full fledged queues. (I have already made a very unsafe and innefficient implementation to showcase, but it needs more SDL_Mutex/SDL_Cond...).
  • Loading branch information
ducalex committed Aug 22, 2024
1 parent 15fa07e commit cecd9ef
Show file tree
Hide file tree
Showing 4 changed files with 115 additions and 82 deletions.
37 changes: 12 additions & 25 deletions components/retro-go/rg_display.c
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
#define LCD_BUFFER_LENGTH (RG_SCREEN_WIDTH * 4) // In pixels

// static rg_display_driver_t driver;
static rg_queue_t *display_task_queue;
static rg_task_t *display_task_queue;
static rg_display_counters_t counters;
static rg_display_config_t config;
static rg_surface_t *osd;
Expand Down Expand Up @@ -299,17 +299,12 @@ static bool load_border_file(const char *filename)
IRAM_ATTR
static void display_task(void *arg)
{
display_task_queue = rg_queue_create(1, sizeof(rg_surface_t *));
rg_task_msg_t msg;

while (1)
while (rg_task_peek(&msg))
{
const rg_surface_t *update;

rg_queue_peek(display_task_queue, &update, -1);
// rg_queue_receive(display_task_queue, &update, -1);

// Received a shutdown request!
if (update == (void *)-1)
if (msg.type == RG_TASK_MSG_STOP)
break;

if (display.changed)
Expand All @@ -325,15 +320,12 @@ static void display_task(void *arg)
display.changed = false;
}

write_update(update);
write_update(msg.dataPtr);

rg_queue_receive(display_task_queue, &update, -1);
rg_task_receive(&msg);

lcd_sync();
}

rg_queue_free(display_task_queue);
display_task_queue = NULL;
}

void rg_display_force_redraw(void)
Expand Down Expand Up @@ -453,17 +445,17 @@ void rg_display_submit(const rg_surface_t *update, uint32_t flags)
display.changed = true;
}

rg_queue_send(display_task_queue, &update, 1000);
rg_task_send(display_task_queue, &(rg_task_msg_t){.dataPtr = update});

counters.blockTime += rg_system_timer() - time_start;
counters.totalFrames++;
}

bool rg_display_sync(bool block)
{
while (block && rg_queue_messages_waiting(display_task_queue))
continue; // Wait until display queue is done
return rg_queue_messages_waiting(display_task_queue) == 0;
while (block && rg_task_messages_waiting(display_task_queue))
continue; // We should probably yield?
return !rg_task_messages_waiting(display_task_queue);
}

void rg_display_write(int left, int top, int width, int height, int stride, const uint16_t *buffer, uint32_t flags)
Expand Down Expand Up @@ -551,12 +543,7 @@ void rg_display_clear(uint16_t color_le)

void rg_display_deinit(void)
{
void *stop = (void *)-1;
rg_queue_send(display_task_queue, &stop, 1000);
// display_task_queue has len == 1. When xQueueSend returns, we know that the only
// thing in it is our quit request which won't touch the LCD or SPI anymore
// while (display_task_queue)
// rg_task_yield();
rg_task_send(display_task_queue, &(rg_task_msg_t){.type = RG_TASK_MSG_STOP});
lcd_deinit();
RG_LOGI("Display terminated.\n");
}
Expand Down Expand Up @@ -585,7 +572,7 @@ void rg_display_init(void)
.changed = true,
};
lcd_init();
rg_task_create("rg_display", &display_task, NULL, 4 * 1024, RG_TASK_PRIORITY_6, 1);
display_task_queue = rg_task_create("rg_display", &display_task, NULL, 4 * 1024, RG_TASK_PRIORITY_6, 1);
if (config.border_file)
load_border_file(config.border_file);
RG_LOGI("Display ready.\n");
Expand Down
117 changes: 80 additions & 37 deletions components/retro-go/rg_system.c
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,13 @@ struct rg_task_s
{
void (*func)(void *arg);
void *arg;
// rg_queue_t *queue;
// bool blocked;
#ifdef ESP_PLATFORM
QueueHandle_t queue;
TaskHandle_t handle;
#else
rg_task_msg_t msg;
int msgWaiting;
SDL_threadID handle;
#endif
char name[16];
Expand Down Expand Up @@ -489,9 +492,9 @@ static void task_wrapper(void *arg)
{
rg_task_t *task = arg;
task->handle = xTaskGetCurrentTaskHandle();
// task->queue = rg_queue_create(1, sizeof(void *));
task->queue = xQueueCreate(1, sizeof(rg_task_msg_t));
(task->func)(task->arg);
// rg_queue_free(task->queue);
vQueueDelete(task->queue);
memset(task, 0, sizeof(rg_task_t));
vTaskDelete(NULL);
}
Expand All @@ -500,9 +503,7 @@ static int task_wrapper(void *arg)
{
rg_task_t *task = arg;
task->handle = SDL_ThreadID();
// task->queue = rg_queue_create(1, sizeof(void *));
(task->func)(task->arg);
// rg_queue_free(task->queue);
memset(task, 0, sizeof(rg_task_t));
return 0;
}
Expand All @@ -517,7 +518,7 @@ rg_task_t *rg_task_create(const char *name, void (*taskFunc)(void *arg), void *a
{
if (tasks[i].func)
continue;
task = &tasks[i];
task = memset(&tasks[i], 0, sizeof(rg_task_t));
break;
}
RG_ASSERT(task, "Out of task slots");
Expand Down Expand Up @@ -548,7 +549,7 @@ rg_task_t *rg_task_create(const char *name, void (*taskFunc)(void *arg), void *a

rg_task_t *rg_task_find(const char *name)
{
RG_ASSERT_ARG(name != NULL);
RG_ASSERT_ARG(name);
for (size_t i = 0; i < RG_COUNT(tasks); ++i)
{
if (strncmp(tasks[i].name, name, 16) == 0)
Expand All @@ -557,62 +558,104 @@ rg_task_t *rg_task_find(const char *name)
return NULL;
}

void rg_task_delay(uint32_t ms)
rg_task_t *rg_task_current(void)
{
#if defined(ESP_PLATFORM)
vTaskDelay(pdMS_TO_TICKS(ms));
TaskHandle_t handle = xTaskGetCurrentTaskHandle();
#elif defined(RG_TARGET_SDL2)
SDL_PumpEvents();
SDL_Delay(ms);
SDL_threadID handle = SDL_ThreadID();
#endif
for (size_t i = 0; i < RG_COUNT(tasks); ++i)
{
if (tasks[i].handle == handle)
return &tasks[i];
}
return NULL;
}

void rg_task_yield(void)
bool rg_task_send(rg_task_t *task, const rg_task_msg_t *msg)
{
RG_ASSERT_ARG(task && msg);
#if defined(ESP_PLATFORM)
vPortYield();
return xQueueSend(task->queue, msg, portMAX_DELAY) == pdTRUE;
#elif defined(RG_TARGET_SDL2)
SDL_PumpEvents();
while (task->msgWaiting > 0)
continue;
task->msg = *msg;
task->msgWaiting = 1;
return true;
#endif
}

rg_queue_t *rg_queue_create(size_t length, size_t itemSize)
{
return (rg_queue_t *)xQueueCreate(length, itemSize);
}

void rg_queue_free(rg_queue_t *queue)
bool rg_task_peek(rg_task_msg_t *out)
{
if (queue)
vQueueDelete((QueueHandle_t)queue);
rg_task_t *task = rg_task_current();
bool success = false;
if (!task || !out)
return false;
// task->blocked = true;
#if defined(ESP_PLATFORM)
success = xQueuePeek(task->queue, out, portMAX_DELAY) == pdTRUE;
#elif defined(RG_TARGET_SDL2)
while (task->msgWaiting < 1)
continue;
*out = task->msg;
#endif
// task->blocked = false;
return success;
}

bool rg_queue_send(rg_queue_t *queue, const void *item, int timeoutMS)
bool rg_task_receive(rg_task_msg_t *out)
{
int ticks = timeoutMS < 0 ? portMAX_DELAY : pdMS_TO_TICKS(timeoutMS);
return xQueueSend((QueueHandle_t)queue, item, ticks) == pdTRUE;
rg_task_t *task = rg_task_current();
bool success = false;
if (!task || !out)
return false;
// task->blocked = true;
#if defined(ESP_PLATFORM)
success = xQueueReceive(task->queue, out, portMAX_DELAY) == pdTRUE;
#elif defined(RG_TARGET_SDL2)
while (task->msgWaiting < 1)
continue;
*out = task->msg;
task->msgWaiting = 0;
#endif
// task->blocked = false;
return success;
}

bool rg_queue_receive(rg_queue_t *queue, void *out, int timeoutMS)
size_t rg_task_messages_waiting(rg_task_t *task)
{
int ticks = timeoutMS < 0 ? portMAX_DELAY : pdMS_TO_TICKS(timeoutMS);
return xQueueReceive((QueueHandle_t)queue, out, ticks) == pdTRUE;
if (!task) task = rg_task_current();
#if defined(ESP_PLATFORM)
return uxQueueMessagesWaiting(task->queue);
#elif defined(RG_TARGET_SDL2)
return task->msgWaiting;
#endif
}

bool rg_queue_peek(rg_queue_t *queue, void *out, int timeoutMS)
{
int ticks = timeoutMS < 0 ? portMAX_DELAY : pdMS_TO_TICKS(timeoutMS);
return xQueuePeek((QueueHandle_t)queue, out, ticks) == pdTRUE;
}
// bool rg_task_is_blocked(rg_task_t *task)
// {
// return task->blocked;
// }

size_t rg_queue_messages_waiting(rg_queue_t *queue)
void rg_task_delay(uint32_t ms)
{
return uxQueueMessagesWaiting((QueueHandle_t)queue);
#if defined(ESP_PLATFORM)
vTaskDelay(pdMS_TO_TICKS(ms));
#elif defined(RG_TARGET_SDL2)
SDL_PumpEvents();
SDL_Delay(ms);
#endif
}

size_t rg_queue_spaces_available(rg_queue_t *queue)
void rg_task_yield(void)
{
return uxQueueSpacesAvailable((QueueHandle_t)queue);
#if defined(ESP_PLATFORM)
vPortYield();
#elif defined(RG_TARGET_SDL2)
SDL_PumpEvents();
#endif
}

rg_mutex_t *rg_mutex_create(void)
Expand Down
27 changes: 16 additions & 11 deletions components/retro-go/rg_system.h
Original file line number Diff line number Diff line change
Expand Up @@ -250,25 +250,30 @@ void rg_system_save_time(void);

// Wrappers for the OS' task/thread creation API. It also keeps track of handles for debugging purposes...
typedef struct rg_task_s rg_task_t;
typedef struct
{
int type; // Negative values are reserved
union
{
const void *dataPtr;
uint32_t dataInt;
};
} rg_task_msg_t;
#define RG_TASK_MSG_STOP -1
rg_task_t *rg_task_create(const char *name, void (*taskFunc)(void *arg), void *arg, size_t stackSize, int priority, int affinity);
rg_task_t *rg_task_find(const char *name);
rg_task_t *rg_task_current(void);
bool rg_task_send(rg_task_t *task, const rg_task_msg_t *msg);
bool rg_task_peek(rg_task_msg_t *out);
bool rg_task_receive(rg_task_msg_t *out);
bool rg_task_is_blocked(rg_task_t *task);
size_t rg_task_messages_waiting(rg_task_t *task);
// The main difference between rg_task_delay and rg_usleep is that rg_task_delay will yield
// to other tasks and will not busy wait time smaller than a tick. Meaning rg_usleep
// is more accurate but rg_task_delay is more multitasking-friendly.
void rg_task_delay(uint32_t ms);
void rg_task_yield(void);

// Wrapper for FreeRTOS queues, which are essentially inter-task communication primitives
// Retro-Go uses them for locks and message passing. Not sure how we could easily replicate in SDL2 yet...
typedef void rg_queue_t;
rg_queue_t *rg_queue_create(size_t length, size_t itemSize);
void rg_queue_free(rg_queue_t *queue);
bool rg_queue_send(rg_queue_t *queue, const void *item, int timeoutMS);
bool rg_queue_receive(rg_queue_t *queue, void *out, int timeoutMS);
bool rg_queue_peek(rg_queue_t *queue, void *out, int timeoutMS);
size_t rg_queue_messages_waiting(rg_queue_t *queue);
size_t rg_queue_spaces_available(rg_queue_t *queue);

typedef void rg_mutex_t;
rg_mutex_t *rg_mutex_create(void);
void rg_mutex_free(rg_mutex_t *mutex);
Expand Down
16 changes: 7 additions & 9 deletions fmsx/main/main.c
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

static rg_surface_t *updates[2];
static rg_surface_t *currentUpdate;
static rg_queue_t *audioQueue;
static rg_task_t *audioQueue;
static rg_app_t *app;

static int JoyState, LastKey, InMenu, InKeyboard;
Expand Down Expand Up @@ -343,7 +343,7 @@ void PlayAllSound(int uSec)
{
int64_t start = rg_system_timer();
unsigned int samples = 2 * uSec * AUDIO_SAMPLE_RATE / 1000000;
rg_queue_send(audioQueue, &samples, 100);
rg_task_send(audioQueue, &(rg_task_msg_t){.dataInt = samples});
FrameStartTime += rg_system_timer() - start;
}

Expand Down Expand Up @@ -419,12 +419,11 @@ static rg_gui_event_t fmsx_menu_cb(rg_gui_option_t *option, rg_gui_event_t event
static void audioTask(void *arg)
{
RG_LOGI("task started");
while (true)
rg_task_msg_t msg;
while (rg_task_peek(&msg))
{
unsigned int samples;
rg_queue_peek(audioQueue, &samples, -1);
RenderAndPlayAudio(samples);
rg_queue_receive(audioQueue, &samples, -1);
RenderAndPlayAudio(msg.dataInt);
rg_task_receive(&msg);
}
}

Expand Down Expand Up @@ -490,8 +489,7 @@ void app_main(void)
}
argv[argc++] = app->romPath;

audioQueue = rg_queue_create(1, sizeof(unsigned int));
rg_task_create("audioTask", &audioTask, NULL, 4096, RG_TASK_PRIORITY_2, 1);
audioQueue = rg_task_create("audioTask", &audioTask, NULL, 4096, RG_TASK_PRIORITY_2, 1);

RG_LOGI("fMSX start");
fmsx_main(argc, (char **)argv);
Expand Down

0 comments on commit cecd9ef

Please sign in to comment.