From 37ef55a23336f289bd21b416662443effe8f8ba5 Mon Sep 17 00:00:00 2001 From: Ian Craggs Date: Tue, 3 Sep 2024 18:49:27 +0100 Subject: [PATCH] Synchronize sendMessage/send #1494 --- src/MQTTAsync.c | 32 ++++++++++++++++++++++++++++++-- src/MQTTAsyncUtils.c | 12 ------------ 2 files changed, 30 insertions(+), 14 deletions(-) diff --git a/src/MQTTAsync.c b/src/MQTTAsync.c index a21bcd4d..a62a3d03 100644 --- a/src/MQTTAsync.c +++ b/src/MQTTAsync.c @@ -980,6 +980,13 @@ int MQTTAsync_reconnect(MQTTAsync handle) } +int MQTTAsync_inCallback() +{ + thread_id_type thread_id = Paho_thread_getid(); + return thread_id == sendThread_id || thread_id == receiveThread_id; +} + + int MQTTAsync_subscribeMany(MQTTAsync handle, int count, char* const* topic, const int* qos, MQTTAsync_responseOptions* response) { MQTTAsyncs* m = handle; @@ -989,6 +996,8 @@ int MQTTAsync_subscribeMany(MQTTAsync handle, int count, char* const* topic, con int msgid = 0; FUNC_ENTRY; + if (!MQTTAsync_inCallback()) + MQTTAsync_lock_mutex(mqttasync_mutex); if (m == NULL || m->c == NULL) rc = MQTTASYNC_FAILURE; else if (m->c->connected == 0) @@ -1092,6 +1101,8 @@ int MQTTAsync_subscribeMany(MQTTAsync handle, int count, char* const* topic, con rc = PAHO_MEMORY_ERROR; exit: + if (!MQTTAsync_inCallback()) + MQTTAsync_unlock_mutex(mqttasync_mutex); FUNC_EXIT_RC(rc); return rc; } @@ -1116,6 +1127,8 @@ int MQTTAsync_unsubscribeMany(MQTTAsync handle, int count, char* const* topic, M int msgid = 0; FUNC_ENTRY; + if (!MQTTAsync_inCallback()) + MQTTAsync_lock_mutex(mqttasync_mutex); if (m == NULL || m->c == NULL) rc = MQTTASYNC_FAILURE; else if (m->c->connected == 0) @@ -1180,6 +1193,8 @@ int MQTTAsync_unsubscribeMany(MQTTAsync handle, int count, char* const* topic, M rc = MQTTAsync_addCommand(unsub, sizeof(unsub)); exit: + if (!MQTTAsync_inCallback()) + MQTTAsync_unlock_mutex(mqttasync_mutex); FUNC_EXIT_RC(rc); return rc; } @@ -1204,6 +1219,8 @@ int MQTTAsync_send(MQTTAsync handle, const char* destinationName, int payloadlen int msgid = 0; FUNC_ENTRY; + if (!MQTTAsync_inCallback()) + MQTTAsync_lock_mutex(mqttasync_mutex); if (m == NULL || m->c == NULL) rc = MQTTASYNC_FAILURE; else if (m->c->connected == 0) @@ -1287,6 +1304,8 @@ int MQTTAsync_send(MQTTAsync handle, const char* destinationName, int payloadlen rc = MQTTAsync_addCommand(pub, sizeof(pub)); exit: + if (!MQTTAsync_inCallback()) + MQTTAsync_unlock_mutex(mqttasync_mutex); FUNC_EXIT_RC(rc); return rc; } @@ -1324,10 +1343,19 @@ int MQTTAsync_sendMessage(MQTTAsync handle, const char* destinationName, const M int MQTTAsync_disconnect(MQTTAsync handle, const MQTTAsync_disconnectOptions* options) { + int rc = 0; + + FUNC_ENTRY; + if (!MQTTAsync_inCallback()) + MQTTAsync_lock_mutex(mqttasync_mutex); if (options != NULL && (strncmp(options->struct_id, "MQTD", 4) != 0 || options->struct_version < 0 || options->struct_version > 1)) - return MQTTASYNC_BAD_STRUCTURE; + rc = MQTTASYNC_BAD_STRUCTURE; else - return MQTTAsync_disconnect1(handle, options, 0); + rc = MQTTAsync_disconnect1(handle, options, 0); + if (!MQTTAsync_inCallback()) + MQTTAsync_unlock_mutex(mqttasync_mutex); + FUNC_EXIT_RC(rc); + return rc; } diff --git a/src/MQTTAsyncUtils.c b/src/MQTTAsyncUtils.c index 6605014d..2e5c27fb 100644 --- a/src/MQTTAsyncUtils.c +++ b/src/MQTTAsyncUtils.c @@ -2754,19 +2754,9 @@ int MQTTAsync_assignMsgId(MQTTAsyncs* m) { int start_msgid; int msgid; - thread_id_type thread_id = 0; - int locked = 0; /* need to check: commands list and response list for a client */ FUNC_ENTRY; - /* We might be called in a callback. In which case, this mutex will be already locked. */ - thread_id = Paho_thread_getid(); - if (thread_id != sendThread_id && thread_id != receiveThread_id) - { - MQTTAsync_lock_mutex(mqttasync_mutex); - locked = 1; - } - /* Fetch last message ID in locked state */ start_msgid = m->c->msgID; msgid = start_msgid; @@ -2787,8 +2777,6 @@ int MQTTAsync_assignMsgId(MQTTAsyncs* m) MQTTAsync_unlock_mutex(mqttcommand_mutex); if (msgid != 0) m->c->msgID = msgid; - if (locked) - MQTTAsync_unlock_mutex(mqttasync_mutex); FUNC_EXIT_RC(msgid); return msgid; }