--- a/mqtt/mqtt_template.c Mon Jul 22 16:13:27 2024 +0200
+++ b/mqtt/mqtt_template.c Tue Jul 23 11:05:46 2024 +0200
@@ -51,8 +51,11 @@
/* condition to wakeup publish thread */
static int MQTT_any_pub_var_changed = 0;
-/* mutex to keep PLC data consistent, and protect MQTT_any_pub_var_changed */
-static pthread_mutex_t MQTT_mutex;
+/* mutex to keep incoming PLC data consistent */ +static pthread_mutex_t MQTT_retrieve_mutex = PTHREAD_MUTEX_INITIALIZER; +/* mutex to keep outgoing PLC data consistent, and protect MQTT_any_pub_var_changed */ +static pthread_mutex_t MQTT_publish_mutex = PTHREAD_MUTEX_INITIALIZER; /* wakeup publish thread when PLC changed published variable */
static pthread_cond_t MQTT_new_data = PTHREAD_COND_INITIALIZER;
@@ -97,10 +100,10 @@
/* stop publish thread */
- if (pthread_mutex_trylock(&MQTT_mutex) == 0){{
+ if (pthread_mutex_lock(&MQTT_publish_mutex) == 0){{ /* unblock publish thread so that it can stop normally */
pthread_cond_signal(&MQTT_new_data);
- pthread_mutex_unlock(&MQTT_mutex);
+ pthread_mutex_unlock(&MQTT_publish_mutex); pthread_join(publishThread, NULL);
@@ -146,8 +149,11 @@
if(__get_type_enum_size(topics[mid].vartype) == message->payloadlen){{
- memcpy(topics[mid].mqtt_pdata, (char*)message->payload, message->payloadlen);
- *topics[mid].mqtt_pchanged = 1;
+ if (pthread_mutex_lock(&MQTT_retrieve_mutex) == 0){{ + memcpy(topics[mid].mqtt_pdata, (char*)message->payload, message->payloadlen); + *topics[mid].mqtt_pchanged = 1; + pthread_mutex_unlock(&MQTT_retrieve_mutex); LogWarning("MQTT wrong payload size for topic: %s. Should be %d, but got %d.",
topicName, __get_type_enum_size(topics[mid].vartype), message->payloadlen);
@@ -231,8 +237,8 @@
static void *__publish_thread(void *_unused) {{
- while((rc = pthread_mutex_lock(&MQTT_mutex)) == 0 && !MQTT_stop_thread){{
- pthread_cond_wait(&MQTT_new_data, &MQTT_mutex);
+ while((rc = pthread_mutex_lock(&MQTT_publish_mutex)) == 0 && !MQTT_stop_thread){{ + pthread_cond_wait(&MQTT_new_data, &MQTT_publish_mutex); int is_connected = MQTTClient_isConnected(client);
if(MQTT_any_pub_var_changed && is_connected){{
@@ -249,7 +255,7 @@
- pthread_mutex_unlock(&MQTT_mutex);
+ pthread_mutex_unlock(&MQTT_publish_mutex); if(MQTT_stop_thread) break;
@@ -286,29 +292,37 @@
if (rc != MQTTCLIENT_SUCCESS)
LogError("MQTT Failed to create client, return code %d\n", rc);
rc = MQTTClient_setCallbacks(client, NULL, NULL, messageArrived, NULL);
if (rc != MQTTCLIENT_SUCCESS)
LogError("MQTT Failed to set callbacks, return code %d\n", rc);
if (rc != MQTTCLIENT_SUCCESS) {{
LogError("MQTT Connect Failed, return code %d\n", rc);
- /* TODO start publish thread */
+ /* start publish thread */ rc = pthread_create(&publishThread, NULL, &__publish_thread, NULL);
+ LogError("MQTT cannot create thread, return code %d\n", rc); + MQTTClient_destroy(&client); #define READ_VALUE(c_loc_name, C_type) \
@@ -320,9 +334,9 @@
void __retrieve_{locstr}(void)
- if (pthread_mutex_trylock(&MQTT_mutex) == 0){{
+ if (pthread_mutex_trylock(&MQTT_retrieve_mutex) == 0){{ - pthread_mutex_unlock(&MQTT_mutex);
+ pthread_mutex_unlock(&MQTT_retrieve_mutex); @@ -336,7 +350,7 @@
void __publish_{locstr}(void)
- if (pthread_mutex_trylock(&MQTT_mutex) == 0){{
+ if (pthread_mutex_trylock(&MQTT_publish_mutex) == 0){{ MQTT_any_pub_var_changed = 0;
/* copy PLC_* variables to MQTT_*, and mark those who changed */
@@ -344,7 +358,7 @@
if(MQTT_any_pub_var_changed){{
pthread_cond_signal(&MQTT_new_data);
- pthread_mutex_unlock(&MQTT_mutex);
+ pthread_mutex_unlock(&MQTT_publish_mutex); /* TODO if couldn't lock mutex set status variable accordingly */