--- a/mqtt/mqtt_client_gen.py Mon Jul 15 09:40:11 2024 +0200
+++ b/mqtt/mqtt_client_gen.py Tue Jul 16 09:41:45 2024 +0200
@@ -367,9 +367,13 @@
LogWarning("Paho MQTT Trace : %d, %s\\n", level, message);
#define DECL_VAR(iec_type, C_type, c_loc_name) \\
static C_type PLC_##c_loc_name##_buf = 0; \\
static C_type MQTT_##c_loc_name##_buf = 0; \\
+static int MQTT_##c_loc_name##_state = UNCHANGED; /* systematically published at init */ \\ C_type *c_loc_name = &PLC_##c_loc_name##_buf;
@@ -380,7 +384,21 @@
static MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;
-static pthread_mutex_t clientMutex; // mutex to keep PLC data consistent
+/* condition to quit publish thread */ +static int MQTT_stop_thread = 0; +/* condition to wakeup publish thread */ +static int MQTT_any_pub_var_changed = 0; +/* mutex to keep PLC data consistent, and protect MQTT_any_pub_var_changed */ +static pthread_mutex_t MQTT_mutex; +/* wakeup publish thread when PLC changed published variable */ +static pthread_cond_t MQTT_new_data = PTHREAD_COND_INITIALIZER; +static pthread_t publishThread; #define INIT_TOPIC(topic, iec_type, c_loc_name) \\
{{#topic, &MQTT_##c_loc_name##_buf, iec_type##_ENUM}},
@@ -480,9 +498,6 @@
#define _PUBLISH(Topic, QoS, C_type, c_loc_name, Retained) \\
MQTTResponse response = MQTTClient_publish5(client, #Topic, sizeof(C_type), \\
@@ -501,11 +516,45 @@
_PUBLISH(Topic, QoS, C_type, c_loc_name, Retained) \\
if (rc != MQTTCLIENT_SUCCESS) \\
- LogError("MQTT client failed to subscribe to '%s', return code %d\\n", #Topic, rc); \\
+ LogError("MQTT client failed to init publication of '%s', return code %d\\n", #Topic, rc);\\ + /* TODO update status variable accordingly */ \\ +#define PUBLISH_CHANGE(Topic, QoS, C_type, c_loc_name, Retained) \\ + if(MQTT_##c_loc_name##_state == CHANGED) \\ + _PUBLISH(Topic, QoS, C_type, c_loc_name, Retained) \\ + if (rc != MQTTCLIENT_SUCCESS) \\ + LogError("MQTT client failed to publish '%s', return code %d\\n", #Topic, rc); \\ + /* TODO update status variable accordingly */ \\ + MQTT_##c_loc_name##_state = UNCHANGED; \\ +static void *__publish_thread(void *_unused) {{ + while((rc = pthread_mutex_lock(&MQTT_mutex)) == 0 && !MQTT_stop_thread){{ + pthread_cond_wait(&MQTT_new_data, &MQTT_mutex); + if(MQTT_any_pub_var_changed){{ + /* publish changes, and reset variable's state to UNCHANGED */ + MQTT_any_pub_var_changed = 0; + pthread_mutex_unlock(&MQTT_mutex); + if(!MQTT_stop_thread){{ + /* if thread exits outside of normal shutdown, report error*/ + LogError("MQTT client thread exited unexpectedly, return code %d\\n", rc); int __init_{locstr}(int argc,char **argv)
@@ -552,6 +601,7 @@
/* TODO start publish thread */
+ rc = pthread_create(&publishThread, NULL, &__publish_thread, NULL); @@ -567,28 +617,41 @@
#define WRITE_VALUE(c_loc_name, C_type) \\
- MQTT_##c_loc_name##_buf = PLC_##c_loc_name##_buf;
+ if(MQTT_##c_loc_name##_buf != PLC_##c_loc_name##_buf){{ \\ + MQTT_##c_loc_name##_buf = PLC_##c_loc_name##_buf; \\ + MQTT_##c_loc_name##_state = CHANGED; \\ + MQTT_any_pub_var_changed = 1; \\ void __publish_{locstr}(void)
- /* TODO try take mutex */
+ if (pthread_mutex_trylock(&MQTT_mutex) == 0){{ + MQTT_any_pub_var_changed = 0; + /* copy PLC_* variables to MQTT_*, and mark those who changed */
- /* TODO unblock publish thread */
+ /* if any change detcted, unblock publish thread */ + if(MQTT_any_pub_var_changed){{ + pthread_cond_signal(&MQTT_new_data); + pthread_mutex_unlock(&MQTT_mutex); + /* TODO if couldn't lock mutex set status variable accordingly */
- clientID = config["clientID"],
+ clientID = config["clientID"], @@ -637,6 +700,8 @@
INIT_PUBLICATION({Topic}, {QoS}, {C_type}, {c_loc_name}, {Retained})""".format(**locals())
formatdict["publish"] += """
WRITE_VALUE({c_loc_name}, {C_type})""".format(**locals())
+ formatdict["publish_changes"] += """ + PUBLISH_CHANGE({Topic}, {QoS}, {C_type}, {c_loc_name}, {Retained})""".format(**locals()) Ccode = template.format(**formatdict)