--- a/mqtt/mqtt_client_gen.py Mon Jul 22 12:12:33 2024 +0200
+++ b/mqtt/mqtt_client_gen.py Mon Jul 22 16:09:12 2024 +0200
@@ -9,6 +9,8 @@
+import util.paths as paths # from perfect_hash import generate_code, IntSaltHash
@@ -299,363 +301,10 @@
writer.writerow([direction] + row)
def GenerateC(self, path, locstr, config):
- template = """/* code generated by beremiz MQTT extension */
-#include "MQTTClientPersistence.h"
-#define _Log(level, ...) \\
- snprintf(mstr, 255, __VA_ARGS__); \\
- LogMessage(level, mstr, strlen(mstr)); \\
- printf(__VA_ARGS__); \\
-#define LogInfo(...) _Log(LOG_INFO, __VA_ARGS__);
-#define LogError(...) _Log(LOG_CRITICAL, __VA_ARGS__);
-#define LogWarning(...) _Log(LOG_WARNING, __VA_ARGS__);
-void trace_callback(enum MQTTCLIENT_TRACE_LEVELS level, char* message)
- LogInfo("Paho MQTT Trace : %d, %s\\n", level, message);
-#define DECL_VAR(iec_type, C_type, c_loc_name) \\
-static C_type PLC_##c_loc_name##_buf = 0; \\
-static C_type MQTT_##c_loc_name##_buf = 0; \\
-static int MQTT_##c_loc_name##_state = UNCHANGED; /* systematically published at init */ \\
-C_type *c_loc_name = &PLC_##c_loc_name##_buf;
-static MQTTClient client;
-static MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer5;
-static MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;
-/* condition to quit publish thread */
-static int MQTT_stop_thread = 0;
-/* condition to wakeup publish thread */
-static int MQTT_any_pub_var_changed = 0;
-/* mutex to keep PLC data consistent, and protect MQTT_any_pub_var_changed */
-static pthread_mutex_t MQTT_mutex;
-/* wakeup publish thread when PLC changed published variable */
-static pthread_cond_t MQTT_new_data = PTHREAD_COND_INITIALIZER;
-static pthread_t publishThread;
-#define INIT_TOPIC(topic, iec_type, c_loc_name) \\
-{{#topic, &MQTT_##c_loc_name##_buf, &MQTT_##c_loc_name##_state, iec_type##_ENUM}},
- const char *topic; //null terminated topic string
- void *mqtt_pdata; // pointer to data from/for MQTT stack
- int *mqtt_pchanged; // pointer to changed flag
- __IEC_types_enum vartype;
-static int _connect_mqtt(void)
- MQTTProperties props = MQTTProperties_initializer;
- MQTTProperties willProps = MQTTProperties_initializer;
- MQTTResponse response = MQTTResponse_initializer;
- response = MQTTClient_connect5(client, &conn_opts, &props, &willProps);
- rc = response.reasonCode;
- MQTTResponse_free(response);
- rc = MQTTClient_connect(client, &conn_opts);
-void __cleanup_{locstr}(void)
- /* stop publish thread */
- if (pthread_mutex_trylock(&MQTT_mutex) == 0){{
- /* unblock publish thread so that it can stop normally */
- pthread_cond_signal(&MQTT_new_data);
- pthread_mutex_unlock(&MQTT_mutex);
- pthread_join(publishThread, NULL);
- if (rc = MQTTClient_disconnect5(client, 5000, MQTTREASONCODE_SUCCESS, NULL) != MQTTCLIENT_SUCCESS)
- if (rc = MQTTClient_disconnect(client, 5000) != MQTTCLIENT_SUCCESS)
- LogError("MQTT Failed to disconnect, return code %d\\n", rc);
- MQTTClient_destroy(&client);
-void connectionLost(void* context, char* reason)
- LogWarning("ConnectionLost, reconnecting\\n");
- // rc = _connect_mqtt();
- // if (rc != MQTTCLIENT_SUCCESS) {{
- // LogError("MQTT reconnect Failed, waiting 5 seconds, return code %d\\n", rc);
-int messageArrived(void *context, char *topicName, int topicLen, MQTTClient_message *message)
- int size = sizeof(topics) / sizeof(topics[0]);
- // bisect topic among subscribed topics
- mid = low + (high - low) / 2;
- res = strncmp(topics[mid].topic, topicName, topicLen);
- // Check if key is present at mid
- // If key greater, ignore left half
- // If key is smaller, ignore right half
- // If we reach here, then the element was not present
- LogWarning("MQTT unknown topic: %s", topicName);
- if(__get_type_enum_size(topics[mid].vartype) == message->payloadlen){{
- memcpy(topics[mid].mqtt_pdata, (char*)message->payload, message->payloadlen);
- *topics[mid].mqtt_pchanged = 1;
- LogWarning("MQTT wrong payload size for topic: %s. Should be %d, but got %d.",
- topicName, __get_type_enum_size(topics[mid].vartype), message->payloadlen);
- MQTTClient_freeMessage(&message);
- MQTTClient_free(topicName);
-#define INIT_NoAuth() \\
- LogInfo("MQTT Init no auth\\n");
-#define INIT_x509(PrivateKey, Certificate) \\
- LogInfo("MQTT Init x509 %s,%s\\n", PrivateKey, Certificate);
-#define INIT_UserPassword(User, Password) \\
- LogInfo("MQTT Init UserPassword %s,%s\\n", User, Password); \\
- conn_opts.username = User; \\
- conn_opts.password = Password;
-#define _SUBSCRIBE(Topic, QoS) \\
- MQTTResponse response = MQTTClient_subscribe5(client, #Topic, QoS, NULL, NULL); \\
- /* when using MQTT5 responce code is 1 for some reason even if no error */ \\
- rc = response.reasonCode == 1 ? MQTTCLIENT_SUCCESS : response.reasonCode; \\
- MQTTResponse_free(response);
-#define _SUBSCRIBE(Topic, QoS) \\
- rc = MQTTClient_subscribe(client, #Topic, QoS);
-#define INIT_SUBSCRIPTION(Topic, QoS) \\
- _SUBSCRIBE(Topic, QoS) \\
- if (rc != MQTTCLIENT_SUCCESS) \\
- LogError("MQTT client failed to subscribe to '%s', return code %d\\n", #Topic, rc); \\
-#define _PUBLISH(Topic, QoS, C_type, c_loc_name, Retained) \\
- MQTTResponse response = MQTTClient_publish5(client, #Topic, sizeof(C_type), \\
- &MQTT_##c_loc_name##_buf, QoS, Retained, NULL, NULL); \\
- rc = response.reasonCode; \\
- MQTTResponse_free(response);
-#define _PUBLISH(Topic, QoS, C_type, c_loc_name, Retained) \\
- rc = MQTTClient_publish(client, #Topic, sizeof(C_type), \\
- &PLC_##c_loc_name##_buf, QoS, Retained, NULL);
-#define INIT_PUBLICATION(Topic, QoS, C_type, c_loc_name, Retained) \\
- _PUBLISH(Topic, QoS, C_type, c_loc_name, Retained) \\
- if (rc != MQTTCLIENT_SUCCESS) \\
- LogError("MQTT client failed to init publication of '%s', return code %d\\n", #Topic, rc);\\
- /* TODO update status variable accordingly */ \\
-#define PUBLISH_CHANGE(Topic, QoS, C_type, c_loc_name, Retained) \\
- if(MQTT_##c_loc_name##_state == CHANGED) \\
- _PUBLISH(Topic, QoS, C_type, c_loc_name, Retained) \\
- if (rc != MQTTCLIENT_SUCCESS) \\
- LogError("MQTT client failed to publish '%s', return code %d\\n", #Topic, rc); \\
- /* TODO update status variable accordingly */ \\
- MQTT_##c_loc_name##_state = UNCHANGED; \\
-static void *__publish_thread(void *_unused) {{
- while((rc = pthread_mutex_lock(&MQTT_mutex)) == 0 && !MQTT_stop_thread){{
- pthread_cond_wait(&MQTT_new_data, &MQTT_mutex);
- if(MQTT_any_pub_var_changed && MQTTClient_isConnected(client)){{
- /* publish changes, and reset variable's state to UNCHANGED */
- MQTT_any_pub_var_changed = 0;
- pthread_mutex_unlock(&MQTT_mutex);
- if(MQTT_stop_thread) break;
- if(!MQTT_stop_thread){{
- /* if thread exits outside of normal shutdown, report error*/
- LogError("MQTT client thread exited unexpectedly, return code %d\\n", rc);
-int __init_{locstr}(int argc,char **argv)
- char *clientID = "{clientID}";
- MQTTClient_createOptions createOpts = MQTTClient_createOptions_initializer;
- conn_opts.MQTTVersion = MQTTVERSION_5;
- conn_opts.cleanstart = 1;
- createOpts.MQTTVersion = MQTTVERSION_5;
- conn_opts.cleansession = 1;
- MQTTClient_setTraceCallback(trace_callback);
- MQTTClient_setTraceLevel(MQTTCLIENT_TRACE_ERROR);
- rc = MQTTClient_createWithOptions(
- &client, uri, clientID, MQTTCLIENT_PERSISTENCE_NONE, NULL, &createOpts);
- if (rc != MQTTCLIENT_SUCCESS)
- LogError("MQTT Failed to create client, return code %d\\n", rc);
- rc = MQTTClient_setCallbacks(client, NULL, connectionLost, messageArrived, NULL);
- if (rc != MQTTCLIENT_SUCCESS)
- LogError("MQTT Failed to set callbacks, return code %d\\n", rc);
- if (rc != MQTTCLIENT_SUCCESS) {{
- LogError("MQTT Connect Failed, return code %d\\n", rc);
- /* TODO start publish thread */
- rc = pthread_create(&publishThread, NULL, &__publish_thread, NULL);
-#define READ_VALUE(c_loc_name, C_type) \\
- if(MQTT_##c_loc_name##_state == CHANGED){{ \\
- /* TODO care about endianess */ \\
- PLC_##c_loc_name##_buf = MQTT_##c_loc_name##_buf; \\
- MQTT_##c_loc_name##_state = UNCHANGED; \\
-void __retrieve_{locstr}(void)
- if (pthread_mutex_trylock(&MQTT_mutex) == 0){{
- pthread_mutex_unlock(&MQTT_mutex);
-#define WRITE_VALUE(c_loc_name, C_type) \\
- /* TODO care about endianess */ \\
- if(MQTT_##c_loc_name##_buf != PLC_##c_loc_name##_buf){{ \\
- MQTT_##c_loc_name##_buf = PLC_##c_loc_name##_buf; \\
- MQTT_##c_loc_name##_state = CHANGED; \\
- MQTT_any_pub_var_changed = 1; \\
-void __publish_{locstr}(void)
- if (pthread_mutex_trylock(&MQTT_mutex) == 0){{
- MQTT_any_pub_var_changed = 0;
- /* copy PLC_* variables to MQTT_*, and mark those who changed */
- /* if any change detcted, unblock publish thread */
- if(MQTT_any_pub_var_changed){{
- pthread_cond_signal(&MQTT_new_data);
- pthread_mutex_unlock(&MQTT_mutex);
- /* TODO if couldn't lock mutex set status variable accordingly */
+ c_template_filepath = paths.AbsNeighbourFile(__file__, "mqtt_template.c") + c_template_file = open(c_template_filepath , 'rb') + c_template = c_template_file.read() + c_template_file.close() @@ -716,7 +365,7 @@
formatdict["retrieve"] += """
READ_VALUE({c_loc_name}, {C_type})""".format(**locals())
- Ccode = template.format(**formatdict)
+ Ccode = c_template.format(**formatdict) --- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/mqtt/mqtt_template.c Mon Jul 22 16:09:12 2024 +0200
@@ -0,0 +1,343 @@
+/* code generated by beremiz MQTT extension */ +#include "MQTTClientPersistence.h" +#define _Log(level, ...) \ + /* char mstr[256]; */ \ + /* snprintf(mstr, 255, __VA_ARGS__); */ \ + /* LogMessage(level, mstr, strlen(mstr)); */ \ +#define LogInfo(...) _Log(LOG_INFO, __VA_ARGS__); +#define LogError(...) _Log(LOG_CRITICAL, __VA_ARGS__); +#define LogWarning(...) _Log(LOG_WARNING, __VA_ARGS__); +void trace_callback(enum MQTTCLIENT_TRACE_LEVELS level, char* message) + LogInfo("Paho MQTT Trace : %d, %s\n", level, message); +#define DECL_VAR(iec_type, C_type, c_loc_name) \ +static C_type PLC_##c_loc_name##_buf = 0; \ +static C_type MQTT_##c_loc_name##_buf = 0; \ +static int MQTT_##c_loc_name##_state = UNCHANGED; /* systematically published at init */ \ +C_type *c_loc_name = &PLC_##c_loc_name##_buf; +static MQTTClient client; +static MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer5; +static MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer; +/* condition to quit publish thread */ +static int MQTT_stop_thread = 0; +/* condition to wakeup publish thread */ +static int MQTT_any_pub_var_changed = 0; +/* mutex to keep PLC data consistent, and protect MQTT_any_pub_var_changed */ +static pthread_mutex_t MQTT_mutex; +/* wakeup publish thread when PLC changed published variable */ +static pthread_cond_t MQTT_new_data = PTHREAD_COND_INITIALIZER; +static pthread_t publishThread; +#define INIT_TOPIC(topic, iec_type, c_loc_name) \ +{{#topic, &MQTT_##c_loc_name##_buf, &MQTT_##c_loc_name##_state, iec_type##_ENUM}}, + const char *topic; //null terminated topic string + void *mqtt_pdata; // pointer to data from/for MQTT stack + int *mqtt_pchanged; // pointer to changed flag + __IEC_types_enum vartype; +static int _connect_mqtt(void) + MQTTProperties props = MQTTProperties_initializer; + MQTTProperties willProps = MQTTProperties_initializer; + MQTTResponse response = MQTTResponse_initializer; + response = MQTTClient_connect5(client, &conn_opts, &props, &willProps); + rc = response.reasonCode; + MQTTResponse_free(response); + rc = MQTTClient_connect(client, &conn_opts); +void __cleanup_{locstr}(void) + /* stop publish thread */ + if (pthread_mutex_trylock(&MQTT_mutex) == 0){{ + /* unblock publish thread so that it can stop normally */ + pthread_cond_signal(&MQTT_new_data); + pthread_mutex_unlock(&MQTT_mutex); + pthread_join(publishThread, NULL); + if (rc = MQTTClient_disconnect5(client, 5000, MQTTREASONCODE_SUCCESS, NULL) != MQTTCLIENT_SUCCESS) + if (rc = MQTTClient_disconnect(client, 5000) != MQTTCLIENT_SUCCESS) + LogError("MQTT Failed to disconnect, return code %d\n", rc); + MQTTClient_destroy(&client); +int messageArrived(void *context, char *topicName, int topicLen, MQTTClient_message *message) + int size = sizeof(topics) / sizeof(topics[0]); + // bisect topic among subscribed topics + mid = low + (high - low) / 2; + res = strncmp(topics[mid].topic, topicName, topicLen); + // Check if key is present at mid + // If key greater, ignore left half + // If key is smaller, ignore right half + // If we reach here, then the element was not present + LogWarning("MQTT unknown topic: %s", topicName); + if(__get_type_enum_size(topics[mid].vartype) == message->payloadlen){{ + memcpy(topics[mid].mqtt_pdata, (char*)message->payload, message->payloadlen); + *topics[mid].mqtt_pchanged = 1; + LogWarning("MQTT wrong payload size for topic: %s. Should be %d, but got %d.", + topicName, __get_type_enum_size(topics[mid].vartype), message->payloadlen); + MQTTClient_freeMessage(&message); + MQTTClient_free(topicName); + LogInfo("MQTT Init no auth\n"); +#define INIT_x509(PrivateKey, Certificate) \ + LogInfo("MQTT Init x509 %s,%s\n", PrivateKey, Certificate); +#define INIT_UserPassword(User, Password) \ + LogInfo("MQTT Init UserPassword %s,%s\n", User, Password); \ + conn_opts.username = User; \ + conn_opts.password = Password; +#define _SUBSCRIBE(Topic, QoS) \ + MQTTResponse response = MQTTClient_subscribe5(client, #Topic, QoS, NULL, NULL); \ + /* when using MQTT5 responce code is 1 for some reason even if no error */ \ + rc = response.reasonCode == 1 ? MQTTCLIENT_SUCCESS : response.reasonCode; \ + MQTTResponse_free(response); +#define _SUBSCRIBE(Topic, QoS) \ + rc = MQTTClient_subscribe(client, #Topic, QoS); +#define INIT_SUBSCRIPTION(Topic, QoS) \ + _SUBSCRIBE(Topic, QoS) \ + if (rc != MQTTCLIENT_SUCCESS) \ + LogError("MQTT client failed to subscribe to '%s', return code %d\n", #Topic, rc); \ +#define _PUBLISH(Topic, QoS, C_type, c_loc_name, Retained) \ + MQTTResponse response = MQTTClient_publish5(client, #Topic, sizeof(C_type), \ + &MQTT_##c_loc_name##_buf, QoS, Retained, NULL, NULL); \ + rc = response.reasonCode; \ + MQTTResponse_free(response); +#define _PUBLISH(Topic, QoS, C_type, c_loc_name, Retained) \ + rc = MQTTClient_publish(client, #Topic, sizeof(C_type), \ + &PLC_##c_loc_name##_buf, QoS, Retained, NULL); +#define INIT_PUBLICATION(Topic, QoS, C_type, c_loc_name, Retained) \ + _PUBLISH(Topic, QoS, C_type, c_loc_name, Retained) \ + if (rc != MQTTCLIENT_SUCCESS) \ + LogError("MQTT client failed to init publication of '%s', return code %d\n", #Topic, rc);\ + /* TODO update status variable accordingly */ \ +#define PUBLISH_CHANGE(Topic, QoS, C_type, c_loc_name, Retained) \ + if(MQTT_##c_loc_name##_state == CHANGED) \ + _PUBLISH(Topic, QoS, C_type, c_loc_name, Retained) \ + if (rc != MQTTCLIENT_SUCCESS) \ + LogError("MQTT client failed to publish '%s', return code %d\n", #Topic, rc); \ + /* TODO update status variable accordingly */ \ + MQTT_##c_loc_name##_state = UNCHANGED; \ +static void *__publish_thread(void *_unused) {{ + while((rc = pthread_mutex_lock(&MQTT_mutex)) == 0 && !MQTT_stop_thread){{ + pthread_cond_wait(&MQTT_new_data, &MQTT_mutex); + if(MQTT_any_pub_var_changed && MQTTClient_isConnected(client)){{ + /* publish changes, and reset variable's state to UNCHANGED */ + MQTT_any_pub_var_changed = 0; + pthread_mutex_unlock(&MQTT_mutex); + if(MQTT_stop_thread) break; + if(!MQTT_stop_thread){{ + /* if thread exits outside of normal shutdown, report error*/ + LogError("MQTT client thread exited unexpectedly, return code %d\n", rc); +int __init_{locstr}(int argc,char **argv) + char *clientID = "{clientID}"; + MQTTClient_createOptions createOpts = MQTTClient_createOptions_initializer; + conn_opts.MQTTVersion = MQTTVERSION_5; + conn_opts.cleanstart = 1; + createOpts.MQTTVersion = MQTTVERSION_5; + conn_opts.cleansession = 1; + MQTTClient_setTraceCallback(trace_callback); + MQTTClient_setTraceLevel(MQTTCLIENT_TRACE_ERROR); + rc = MQTTClient_createWithOptions( + &client, uri, clientID, MQTTCLIENT_PERSISTENCE_NONE, NULL, &createOpts); + if (rc != MQTTCLIENT_SUCCESS) + LogError("MQTT Failed to create client, return code %d\n", rc); + rc = MQTTClient_setCallbacks(client, NULL, NULL, messageArrived, NULL); + if (rc != MQTTCLIENT_SUCCESS) + LogError("MQTT Failed to set callbacks, return code %d\n", rc); + if (rc != MQTTCLIENT_SUCCESS) {{ + LogError("MQTT Connect Failed, return code %d\n", rc); + /* TODO start publish thread */ + rc = pthread_create(&publishThread, NULL, &__publish_thread, NULL); +#define READ_VALUE(c_loc_name, C_type) \ + if(MQTT_##c_loc_name##_state == CHANGED){{ \ + /* TODO care about endianess */ \ + PLC_##c_loc_name##_buf = MQTT_##c_loc_name##_buf; \ + MQTT_##c_loc_name##_state = UNCHANGED; \ +void __retrieve_{locstr}(void) + if (pthread_mutex_trylock(&MQTT_mutex) == 0){{ + pthread_mutex_unlock(&MQTT_mutex); +#define WRITE_VALUE(c_loc_name, C_type) \ + /* TODO care about endianess */ \ + if(MQTT_##c_loc_name##_buf != PLC_##c_loc_name##_buf){{ \ + MQTT_##c_loc_name##_buf = PLC_##c_loc_name##_buf; \ + MQTT_##c_loc_name##_state = CHANGED; \ + MQTT_any_pub_var_changed = 1; \ +void __publish_{locstr}(void) + if (pthread_mutex_trylock(&MQTT_mutex) == 0){{ + MQTT_any_pub_var_changed = 0; + /* copy PLC_* variables to MQTT_*, and mark those who changed */ + /* if any change detcted, unblock publish thread */ + if(MQTT_any_pub_var_changed){{ + pthread_cond_signal(&MQTT_new_data); + pthread_mutex_unlock(&MQTT_mutex); + /* TODO if couldn't lock mutex set status variable accordingly */